blob: 40faca5cc67ad1dd0d773355860dfd75f5707609 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05006"""Archives a set of files or directories to a server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05008__version__ = '0.3.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import shutil
17import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050019import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
36
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000038# Version of isolate protocol passed to the server in /handshake request.
39ISOLATE_PROTOCOL_VERSION = '1.0'
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -050040# Version stored and expected in .isolated files.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -050041ISOLATED_FILE_VERSION = '1.3'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043
44# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000045# All files are sorted by likelihood of a change in the file content
46# (currently file size is used to estimate this: larger the file -> larger the
47# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000049# and so on. Numbers here is a trade-off; the more per request, the lower the
50# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
51# larger values cause longer lookups, increasing the initial latency to start
52# uploading, which is especially an issue for large files. This value is
53# optimized for the "few thousands files to look up with minimal number of large
54# files missing" case.
55ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000056
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000057
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000058# A list of already compressed extension types that should not receive any
59# compression before being uploaded.
60ALREADY_COMPRESSED_TYPES = [
61 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
62 'wav', 'zip'
63]
64
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000065
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000066# The file size to be used when we don't know the correct file size,
67# generally used for .isolated files.
68UNKNOWN_FILE_SIZE = None
69
70
71# The size of each chunk to read when downloading and unzipping files.
72ZIPPED_FILE_CHUNK = 16 * 1024
73
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000074# Chunk size to use when doing disk I/O.
75DISK_FILE_CHUNK = 1024 * 1024
76
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000077# Chunk size to use when reading from network stream.
78NET_IO_FILE_CHUNK = 16 * 1024
79
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000080
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081# Read timeout in seconds for downloads from isolate storage. If there's no
82# response from the server within this timeout whole download will be aborted.
83DOWNLOAD_READ_TIMEOUT = 60
84
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000085# Maximum expected delay (in seconds) between successive file fetches
86# in run_tha_test. If it takes longer than that, a deadlock might be happening
87# and all stack frames for all threads are dumped to log.
88DEADLOCK_TIMEOUT = 5 * 60
89
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000090
maruel@chromium.org41601642013-09-18 19:40:46 +000091# The delay (in seconds) to wait between logging statements when retrieving
92# the required files. This is intended to let the user (or buildbot) know that
93# the program is still running.
94DELAY_BETWEEN_UPDATES_IN_SECS = 30
95
96
maruel@chromium.org385d73d2013-09-19 18:33:21 +000097# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
98# specify the names here.
99SUPPORTED_ALGOS = {
100 'md5': hashlib.md5,
101 'sha-1': hashlib.sha1,
102 'sha-512': hashlib.sha512,
103}
104
105
106# Used for serialization.
107SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
108
109
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500110DEFAULT_BLACKLIST = (
111 # Temporary vim or python files.
112 r'^.+\.(?:pyc|swp)$',
113 # .git or .svn directory.
114 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
115)
116
117
118# Chromium-specific.
119DEFAULT_BLACKLIST += (
120 r'^.+\.(?:run_test_cases)$',
121 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
122)
123
124
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500125class Error(Exception):
126 """Generic runtime error."""
127 pass
128
129
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000130class ConfigError(ValueError):
131 """Generic failure to load a .isolated file."""
132 pass
133
134
135class MappingError(OSError):
136 """Failed to recreate the tree."""
137 pass
138
139
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000140def is_valid_hash(value, algo):
141 """Returns if the value is a valid hash for the corresponding algorithm."""
142 size = 2 * algo().digest_size
143 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
144
145
146def hash_file(filepath, algo):
147 """Calculates the hash of a file without reading it all in memory at once.
148
149 |algo| should be one of hashlib hashing algorithm.
150 """
151 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000152 with open(filepath, 'rb') as f:
153 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000155 if not chunk:
156 break
157 digest.update(chunk)
158 return digest.hexdigest()
159
160
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000161def stream_read(stream, chunk_size):
162 """Reads chunks from |stream| and yields them."""
163 while True:
164 data = stream.read(chunk_size)
165 if not data:
166 break
167 yield data
168
169
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800170def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
171 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000172 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800173 if offset:
174 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000175 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000176 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000177 if not data:
178 break
179 yield data
180
181
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000182def file_write(filepath, content_generator):
183 """Writes file content as generated by content_generator.
184
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000185 Creates the intermediary directory as needed.
186
187 Returns the number of bytes written.
188
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000189 Meant to be mocked out in unit tests.
190 """
191 filedir = os.path.dirname(filepath)
192 if not os.path.isdir(filedir):
193 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000194 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000195 with open(filepath, 'wb') as f:
196 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000197 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000198 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000199 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000200
201
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000202def zip_compress(content_generator, level=7):
203 """Reads chunks from |content_generator| and yields zip compressed chunks."""
204 compressor = zlib.compressobj(level)
205 for chunk in content_generator:
206 compressed = compressor.compress(chunk)
207 if compressed:
208 yield compressed
209 tail = compressor.flush(zlib.Z_FINISH)
210 if tail:
211 yield tail
212
213
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000214def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
215 """Reads zipped data from |content_generator| and yields decompressed data.
216
217 Decompresses data in small chunks (no larger than |chunk_size|) so that
218 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
219
220 Raises IOError if data is corrupted or incomplete.
221 """
222 decompressor = zlib.decompressobj()
223 compressed_size = 0
224 try:
225 for chunk in content_generator:
226 compressed_size += len(chunk)
227 data = decompressor.decompress(chunk, chunk_size)
228 if data:
229 yield data
230 while decompressor.unconsumed_tail:
231 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
232 if data:
233 yield data
234 tail = decompressor.flush()
235 if tail:
236 yield tail
237 except zlib.error as e:
238 raise IOError(
239 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
240 # Ensure all data was read and decompressed.
241 if decompressor.unused_data or decompressor.unconsumed_tail:
242 raise IOError('Not all data was decompressed')
243
244
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000245def get_zip_compression_level(filename):
246 """Given a filename calculates the ideal zip compression level to use."""
247 file_ext = os.path.splitext(filename)[1].lower()
248 # TODO(csharp): Profile to find what compression level works best.
249 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
250
251
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000252def create_directories(base_directory, files):
253 """Creates the directory structure needed by the given list of files."""
254 logging.debug('create_directories(%s, %d)', base_directory, len(files))
255 # Creates the tree of directories to create.
256 directories = set(os.path.dirname(f) for f in files)
257 for item in list(directories):
258 while item:
259 directories.add(item)
260 item = os.path.dirname(item)
261 for d in sorted(directories):
262 if d:
263 os.mkdir(os.path.join(base_directory, d))
264
265
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500266def create_symlinks(base_directory, files):
267 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000268 for filepath, properties in files:
269 if 'l' not in properties:
270 continue
271 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500272 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000273 logging.warning('Ignoring symlink %s', filepath)
274 continue
275 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500276 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000277 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000278
279
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000280def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000281 """Determines if the given files appears valid.
282
283 Currently it just checks the file's size.
284 """
285 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000286 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000287 actual_size = os.stat(filepath).st_size
288 if size != actual_size:
289 logging.warning(
290 'Found invalid item %s; %d != %d',
291 os.path.basename(filepath), actual_size, size)
292 return False
293 return True
294
295
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000296class WorkerPool(threading_utils.AutoRetryThreadPool):
297 """Thread pool that automatically retries on IOError and runs a preconfigured
298 function.
299 """
300 # Initial and maximum number of worker threads.
301 INITIAL_WORKERS = 2
302 MAX_WORKERS = 16
303 RETRIES = 5
304
305 def __init__(self):
306 super(WorkerPool, self).__init__(
307 [IOError],
308 self.RETRIES,
309 self.INITIAL_WORKERS,
310 self.MAX_WORKERS,
311 0,
312 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000313
314
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000315class Item(object):
316 """An item to push to Storage.
317
318 It starts its life in a main thread, travels to 'contains' thread, then to
319 'push' thread and then finally back to the main thread.
320
321 It is never used concurrently from multiple threads.
322 """
323
324 def __init__(self, digest, size, is_isolated=False):
325 self.digest = digest
326 self.size = size
327 self.is_isolated = is_isolated
328 self.compression_level = 6
329 self.push_state = None
330
331 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000332 """Iterable with content of this item in chunks of given size.
333
334 Arguments:
335 chunk_size: preferred size of the chunk to produce, may be ignored.
336 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000337 raise NotImplementedError()
338
339
340class FileItem(Item):
341 """A file to push to Storage."""
342
343 def __init__(self, path, digest, size, is_isolated):
344 super(FileItem, self).__init__(digest, size, is_isolated)
345 self.path = path
346 self.compression_level = get_zip_compression_level(path)
347
348 def content(self, chunk_size):
349 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000350
351
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000352class BufferItem(Item):
353 """A byte buffer to push to Storage."""
354
355 def __init__(self, buf, algo, is_isolated=False):
356 super(BufferItem, self).__init__(
357 algo(buf).hexdigest(), len(buf), is_isolated)
358 self.buffer = buf
359
360 def content(self, _chunk_size):
361 return [self.buffer]
362
363
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000364class Storage(object):
365 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000366
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 def __init__(self, storage_api, use_zip):
368 self.use_zip = use_zip
369 self._storage_api = storage_api
370 self._cpu_thread_pool = None
371 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000372
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000373 @property
374 def cpu_thread_pool(self):
375 """ThreadPool for CPU-bound tasks like zipping."""
376 if self._cpu_thread_pool is None:
377 self._cpu_thread_pool = threading_utils.ThreadPool(
378 2, max(threading_utils.num_processors(), 2), 0, 'zip')
379 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000380
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000381 @property
382 def net_thread_pool(self):
383 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
384 if self._net_thread_pool is None:
385 self._net_thread_pool = WorkerPool()
386 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000387
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 def close(self):
389 """Waits for all pending tasks to finish."""
390 if self._cpu_thread_pool:
391 self._cpu_thread_pool.join()
392 self._cpu_thread_pool.close()
393 self._cpu_thread_pool = None
394 if self._net_thread_pool:
395 self._net_thread_pool.join()
396 self._net_thread_pool.close()
397 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000398
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000399 def __enter__(self):
400 """Context manager interface."""
401 return self
402
403 def __exit__(self, _exc_type, _exc_value, _traceback):
404 """Context manager interface."""
405 self.close()
406 return False
407
408 def upload_tree(self, indir, infiles):
409 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000410
411 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 indir: root directory the infiles are based in.
413 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000414
415 Returns:
416 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000417 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000418 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
419
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000420 # Convert |indir| + |infiles| into a list of FileItem objects.
421 # Filter out symlinks, since they are not represented by items on isolate
422 # server side.
423 items = [
424 FileItem(
425 path=os.path.join(indir, filepath),
426 digest=metadata['h'],
427 size=metadata['s'],
428 is_isolated=metadata.get('priority') == '0')
429 for filepath, metadata in infiles.iteritems()
430 if 'l' not in metadata
431 ]
432
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000433 return self.upload_items(items)
434
435 def upload_items(self, items):
436 """Uploads bunch of items to the isolate server.
437
438 Will upload only items that are missing.
439
440 Arguments:
441 items: list of Item instances that represents data to upload.
442
443 Returns:
444 List of items that were uploaded. All other items are already there.
445 """
446 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
447 # used by swarming.py. There's no need to spawn multiple threads and try to
448 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
449 # 'push' should be performed sequentially in the context of current thread.
450
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000451 # For each digest keep only first Item that matches it. All other items
452 # are just indistinguishable copies from the point of view of isolate
453 # server (it doesn't care about paths at all, only content and digests).
454 seen = {}
455 duplicates = 0
456 for item in items:
457 if seen.setdefault(item.digest, item) is not item:
458 duplicates += 1
459 items = seen.values()
460 if duplicates:
461 logging.info('Skipped %d duplicated files', duplicates)
462
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000464 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000466 for missing_item in self.get_missing_items(items):
467 missing.add(missing_item)
468 self.async_push(
469 channel,
470 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
471 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000473 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 # No need to spawn deadlock detector thread if there's nothing to upload.
475 if missing:
476 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
477 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000478 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000479 detector.ping()
480 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000481 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000482 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000483 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000484 logging.info('All files are uploaded')
485
486 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000487 total = len(items)
488 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 logging.info(
490 'Total: %6d, %9.1fkb',
491 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000492 total_size / 1024.)
493 cache_hit = set(items) - missing
494 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 logging.info(
496 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
497 len(cache_hit),
498 cache_hit_size / 1024.,
499 len(cache_hit) * 100. / total,
500 cache_hit_size * 100. / total_size if total_size else 0)
501 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000502 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000503 logging.info(
504 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
505 len(cache_miss),
506 cache_miss_size / 1024.,
507 len(cache_miss) * 100. / total,
508 cache_miss_size * 100. / total_size if total_size else 0)
509
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000510 return uploaded
511
512 def get_fetch_url(self, digest):
513 """Returns an URL that can be used to fetch an item with given digest.
514
515 Arguments:
516 digest: hex digest of item to fetch.
517
518 Returns:
519 An URL or None if underlying protocol doesn't support this.
520 """
521 return self._storage_api.get_fetch_url(digest)
522
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000523 def async_push(self, channel, priority, item):
524 """Starts asynchronous push to the server in a parallel thread.
525
526 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000527 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 priority: thread pool task priority for the push.
529 item: item to upload as instance of Item class.
530 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000531 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000533 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 return item
535
536 # If zipping is not required, just start a push task.
537 if not self.use_zip:
538 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000539 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 return
541
542 # If zipping is enabled, zip in a separate thread.
543 def zip_and_push():
544 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
545 # content right here. It will block until all file is zipped.
546 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
548 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 data = ''.join(stream)
550 except Exception as exc:
551 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800552 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000553 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000554 self.net_thread_pool.add_task_with_channel(
555 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000556 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000557
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000558 def async_fetch(self, channel, priority, digest, size, sink):
559 """Starts asynchronous fetch from the server in a parallel thread.
560
561 Arguments:
562 channel: TaskChannel that receives back |digest| when download ends.
563 priority: thread pool task priority for the fetch.
564 digest: hex digest of an item to download.
565 size: expected size of the item (after decompression).
566 sink: function that will be called as sink(generator).
567 """
568 def fetch():
569 try:
570 # Prepare reading pipeline.
571 stream = self._storage_api.fetch(digest)
572 if self.use_zip:
573 stream = zip_decompress(stream, DISK_FILE_CHUNK)
574 # Run |stream| through verifier that will assert its size.
575 verifier = FetchStreamVerifier(stream, size)
576 # Verified stream goes to |sink|.
577 sink(verifier.run())
578 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800579 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000580 raise
581 return digest
582
583 # Don't bother with zip_thread_pool for decompression. Decompression is
584 # really fast and most probably IO bound anyway.
585 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
586
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 def get_missing_items(self, items):
588 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000589
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000590 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000591
592 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000593 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594
595 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000596 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000597 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 channel = threading_utils.TaskChannel()
599 pending = 0
600 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000601 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000602 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
603 self._storage_api.contains, batch)
604 pending += 1
605 # Yield results as they come in.
606 for _ in xrange(pending):
607 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000608 yield missing
609
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000610 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000611 def batch_items_for_check(items):
612 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613
614 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
617 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000618 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000619
620 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000621 Batches of items to query for existence in a single operation,
622 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623 """
624 batch_count = 0
625 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
626 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000627 for item in sorted(items, key=lambda x: x.size, reverse=True):
628 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000629 if len(next_queries) == batch_size_limit:
630 yield next_queries
631 next_queries = []
632 batch_count += 1
633 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
634 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
635 if next_queries:
636 yield next_queries
637
638
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000639class FetchQueue(object):
640 """Fetches items from Storage and places them into LocalCache.
641
642 It manages multiple concurrent fetch operations. Acts as a bridge between
643 Storage and LocalCache so that Storage and LocalCache don't depend on each
644 other at all.
645 """
646
647 def __init__(self, storage, cache):
648 self.storage = storage
649 self.cache = cache
650 self._channel = threading_utils.TaskChannel()
651 self._pending = set()
652 self._accessed = set()
653 self._fetched = cache.cached_set()
654
655 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
656 """Starts asynchronous fetch of item |digest|."""
657 # Fetching it now?
658 if digest in self._pending:
659 return
660
661 # Mark this file as in use, verify_all_cached will later ensure it is still
662 # in cache.
663 self._accessed.add(digest)
664
665 # Already fetched? Notify cache to update item's LRU position.
666 if digest in self._fetched:
667 # 'touch' returns True if item is in cache and not corrupted.
668 if self.cache.touch(digest, size):
669 return
670 # Item is corrupted, remove it from cache and fetch it again.
671 self._fetched.remove(digest)
672 self.cache.evict(digest)
673
674 # TODO(maruel): It should look at the free disk space, the current cache
675 # size and the size of the new item on every new item:
676 # - Trim the cache as more entries are listed when free disk space is low,
677 # otherwise if the amount of data downloaded during the run > free disk
678 # space, it'll crash.
679 # - Make sure there's enough free disk space to fit all dependencies of
680 # this run! If not, abort early.
681
682 # Start fetching.
683 self._pending.add(digest)
684 self.storage.async_fetch(
685 self._channel, priority, digest, size,
686 functools.partial(self.cache.write, digest))
687
688 def wait(self, digests):
689 """Starts a loop that waits for at least one of |digests| to be retrieved.
690
691 Returns the first digest retrieved.
692 """
693 # Flush any already fetched items.
694 for digest in digests:
695 if digest in self._fetched:
696 return digest
697
698 # Ensure all requested items are being fetched now.
699 assert all(digest in self._pending for digest in digests), (
700 digests, self._pending)
701
702 # Wait for some requested item to finish fetching.
703 while self._pending:
704 digest = self._channel.pull()
705 self._pending.remove(digest)
706 self._fetched.add(digest)
707 if digest in digests:
708 return digest
709
710 # Should never reach this point due to assert above.
711 raise RuntimeError('Impossible state')
712
713 def inject_local_file(self, path, algo):
714 """Adds local file to the cache as if it was fetched from storage."""
715 with open(path, 'rb') as f:
716 data = f.read()
717 digest = algo(data).hexdigest()
718 self.cache.write(digest, [data])
719 self._fetched.add(digest)
720 return digest
721
722 @property
723 def pending_count(self):
724 """Returns number of items to be fetched."""
725 return len(self._pending)
726
727 def verify_all_cached(self):
728 """True if all accessed items are in cache."""
729 return self._accessed.issubset(self.cache.cached_set())
730
731
732class FetchStreamVerifier(object):
733 """Verifies that fetched file is valid before passing it to the LocalCache."""
734
735 def __init__(self, stream, expected_size):
736 self.stream = stream
737 self.expected_size = expected_size
738 self.current_size = 0
739
740 def run(self):
741 """Generator that yields same items as |stream|.
742
743 Verifies |stream| is complete before yielding a last chunk to consumer.
744
745 Also wraps IOError produced by consumer into MappingError exceptions since
746 otherwise Storage will retry fetch on unrelated local cache errors.
747 """
748 # Read one chunk ahead, keep it in |stored|.
749 # That way a complete stream can be verified before pushing last chunk
750 # to consumer.
751 stored = None
752 for chunk in self.stream:
753 assert chunk is not None
754 if stored is not None:
755 self._inspect_chunk(stored, is_last=False)
756 try:
757 yield stored
758 except IOError as exc:
759 raise MappingError('Failed to store an item in cache: %s' % exc)
760 stored = chunk
761 if stored is not None:
762 self._inspect_chunk(stored, is_last=True)
763 try:
764 yield stored
765 except IOError as exc:
766 raise MappingError('Failed to store an item in cache: %s' % exc)
767
768 def _inspect_chunk(self, chunk, is_last):
769 """Called for each fetched chunk before passing it to consumer."""
770 self.current_size += len(chunk)
771 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
772 (self.expected_size != self.current_size)):
773 raise IOError('Incorrect file size: expected %d, got %d' % (
774 self.expected_size, self.current_size))
775
776
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000777class StorageApi(object):
778 """Interface for classes that implement low-level storage operations."""
779
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000780 def get_fetch_url(self, digest):
781 """Returns an URL that can be used to fetch an item with given digest.
782
783 Arguments:
784 digest: hex digest of item to fetch.
785
786 Returns:
787 An URL or None if the protocol doesn't support this.
788 """
789 raise NotImplementedError()
790
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800791 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000792 """Fetches an object and yields its content.
793
794 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000795 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800796 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000797
798 Yields:
799 Chunks of downloaded item (as str objects).
800 """
801 raise NotImplementedError()
802
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000803 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000804 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000805
806 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000807 item: Item object that holds information about an item being pushed.
808 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000809
810 Returns:
811 None.
812 """
813 raise NotImplementedError()
814
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 def contains(self, items):
816 """Checks for existence of given |items| on the server.
817
818 Mutates |items| by assigning opaque implement specific object to Item's
819 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000820
821 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000822 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000823
824 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000825 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000826 """
827 raise NotImplementedError()
828
829
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000830class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000831 """StorageApi implementation that downloads and uploads to Isolate Server.
832
833 It uploads and downloads directly from Google Storage whenever appropriate.
834 """
835
836 class _PushState(object):
837 """State needed to call .push(), to be stored in Item.push_state."""
838 def __init__(self, upload_url, finalize_url):
839 self.upload_url = upload_url
840 self.finalize_url = finalize_url
841 self.uploaded = False
842 self.finalized = False
843
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000844 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000845 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000846 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000847 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000848 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000849 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000850 self._server_caps = None
851
852 @staticmethod
853 def _generate_handshake_request():
854 """Returns a dict to be sent as handshake request body."""
855 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
856 return {
857 'client_app_version': __version__,
858 'fetcher': True,
859 'protocol_version': ISOLATE_PROTOCOL_VERSION,
860 'pusher': True,
861 }
862
863 @staticmethod
864 def _validate_handshake_response(caps):
865 """Validates and normalizes handshake response."""
866 logging.info('Protocol version: %s', caps['protocol_version'])
867 logging.info('Server version: %s', caps['server_app_version'])
868 if caps.get('error'):
869 raise MappingError(caps['error'])
870 if not caps['access_token']:
871 raise ValueError('access_token is missing')
872 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000873
874 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000875 def _server_capabilities(self):
876 """Performs handshake with the server if not yet done.
877
878 Returns:
879 Server capabilities dictionary as returned by /handshake endpoint.
880
881 Raises:
882 MappingError if server rejects the handshake.
883 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000884 # TODO(maruel): Make this request much earlier asynchronously while the
885 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000886 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000887 if self._server_caps is None:
888 request_body = json.dumps(
889 self._generate_handshake_request(), separators=(',', ':'))
890 response = net.url_read(
891 url=self.base_url + '/content-gs/handshake',
892 data=request_body,
893 content_type='application/json',
894 method='POST')
895 if response is None:
896 raise MappingError('Failed to perform handshake.')
897 try:
898 caps = json.loads(response)
899 if not isinstance(caps, dict):
900 raise ValueError('Expecting JSON dict')
901 self._server_caps = self._validate_handshake_response(caps)
902 except (ValueError, KeyError, TypeError) as exc:
903 # KeyError exception has very confusing str conversion: it's just a
904 # missing key value and nothing else. So print exception class name
905 # as well.
906 raise MappingError('Invalid handshake response (%s): %s' % (
907 exc.__class__.__name__, exc))
908 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000909
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000910 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000911 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000912 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000913 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000914
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800915 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000916 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800917 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000918
919 # Because the app engine DB is only eventually consistent, retry 404 errors
920 # because the file might just not be visible yet (even though it has been
921 # uploaded).
922 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800923 source_url,
924 retry_404=True,
925 read_timeout=DOWNLOAD_READ_TIMEOUT,
926 headers={'Range': 'bytes=%d-' % offset} if offset else None)
927
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000928 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800929 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800930
931 # If |offset| is used, verify server respects it by checking Content-Range.
932 if offset:
933 content_range = connection.get_header('Content-Range')
934 if not content_range:
935 raise IOError('Missing Content-Range header')
936
937 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
938 # According to a spec, <size> can be '*' meaning "Total size of the file
939 # is not known in advance".
940 try:
941 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
942 if not match:
943 raise ValueError()
944 content_offset = int(match.group(1))
945 last_byte_index = int(match.group(2))
946 size = None if match.group(3) == '*' else int(match.group(3))
947 except ValueError:
948 raise IOError('Invalid Content-Range header: %s' % content_range)
949
950 # Ensure returned offset equals requested one.
951 if offset != content_offset:
952 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
953 offset, content_offset, content_range))
954
955 # Ensure entire tail of the file is returned.
956 if size is not None and last_byte_index + 1 != size:
957 raise IOError('Incomplete response. Content-Range: %s' % content_range)
958
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000959 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000960
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000961 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000962 assert isinstance(item, Item)
963 assert isinstance(item.push_state, IsolateServer._PushState)
964 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000965
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000966 # TODO(vadimsh): Do not read from |content| generator when retrying push.
967 # If |content| is indeed a generator, it can not be re-winded back
968 # to the beginning of the stream. A retry will find it exhausted. A possible
969 # solution is to wrap |content| generator with some sort of caching
970 # restartable generator. It should be done alongside streaming support
971 # implementation.
972
973 # This push operation may be a retry after failed finalization call below,
974 # no need to reupload contents in that case.
975 if not item.push_state.uploaded:
976 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
977 # upload support is implemented.
978 if isinstance(content, list) and len(content) == 1:
979 content = content[0]
980 else:
981 content = ''.join(content)
982 # PUT file to |upload_url|.
983 response = net.url_read(
984 url=item.push_state.upload_url,
985 data=content,
986 content_type='application/octet-stream',
987 method='PUT')
988 if response is None:
989 raise IOError('Failed to upload a file %s to %s' % (
990 item.digest, item.push_state.upload_url))
991 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000992 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000993 logging.info(
994 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000995
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000996 # Optionally notify the server that it's done.
997 if item.push_state.finalize_url:
998 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
999 # send it to isolated server. That way isolate server can verify that
1000 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1001 # stored files).
1002 response = net.url_read(
1003 url=item.push_state.finalize_url,
1004 data='',
1005 content_type='application/json',
1006 method='POST')
1007 if response is None:
1008 raise IOError('Failed to finalize an upload of %s' % item.digest)
1009 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001010
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001011 def contains(self, items):
1012 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001013
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001014 # Request body is a json encoded list of dicts.
1015 body = [
1016 {
1017 'h': item.digest,
1018 's': item.size,
1019 'i': int(item.is_isolated),
1020 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001021 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001022
1023 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1024 self.base_url,
1025 self.namespace,
1026 urllib.quote(self._server_capabilities['access_token']))
1027 response_body = net.url_read(
1028 url=query_url,
1029 data=json.dumps(body, separators=(',', ':')),
1030 content_type='application/json',
1031 method='POST')
1032 if response_body is None:
1033 raise MappingError('Failed to execute /pre-upload query')
1034
1035 # Response body is a list of push_urls (or null if file is already present).
1036 try:
1037 response = json.loads(response_body)
1038 if not isinstance(response, list):
1039 raise ValueError('Expecting response with json-encoded list')
1040 if len(response) != len(items):
1041 raise ValueError(
1042 'Incorrect number of items in the list, expected %d, '
1043 'but got %d' % (len(items), len(response)))
1044 except ValueError as err:
1045 raise MappingError(
1046 'Invalid response from server: %s, body is %s' % (err, response_body))
1047
1048 # Pick Items that are missing, attach _PushState to them.
1049 missing_items = []
1050 for i, push_urls in enumerate(response):
1051 if push_urls:
1052 assert len(push_urls) == 2, str(push_urls)
1053 item = items[i]
1054 assert item.push_state is None
1055 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
1056 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001057 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001058 len(items), len(items) - len(missing_items))
1059 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001060
1061
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001062class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001063 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001064
1065 The common use case is a NFS/CIFS file server that is mounted locally that is
1066 used to fetch the file on a local partition.
1067 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001068
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001069 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001070 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001071 self.base_path = base_path
1072
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001073 def get_fetch_url(self, digest):
1074 return None
1075
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001076 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001077 assert isinstance(digest, basestring)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001078 return file_read(os.path.join(self.base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001079
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001080 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001081 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001082 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001083
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001084 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001085 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001086 item for item in items
1087 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001088 ]
1089
1090
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001091class LocalCache(object):
1092 """Local cache that stores objects fetched via Storage.
1093
1094 It can be accessed concurrently from multiple threads, so it should protect
1095 its internal state with some lock.
1096 """
1097
1098 def __enter__(self):
1099 """Context manager interface."""
1100 return self
1101
1102 def __exit__(self, _exc_type, _exec_value, _traceback):
1103 """Context manager interface."""
1104 return False
1105
1106 def cached_set(self):
1107 """Returns a set of all cached digests (always a new object)."""
1108 raise NotImplementedError()
1109
1110 def touch(self, digest, size):
1111 """Ensures item is not corrupted and updates its LRU position.
1112
1113 Arguments:
1114 digest: hash digest of item to check.
1115 size: expected size of this item.
1116
1117 Returns:
1118 True if item is in cache and not corrupted.
1119 """
1120 raise NotImplementedError()
1121
1122 def evict(self, digest):
1123 """Removes item from cache if it's there."""
1124 raise NotImplementedError()
1125
1126 def read(self, digest):
1127 """Returns contents of the cached item as a single str."""
1128 raise NotImplementedError()
1129
1130 def write(self, digest, content):
1131 """Reads data from |content| generator and stores it in cache."""
1132 raise NotImplementedError()
1133
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001134 def hardlink(self, digest, dest, file_mode):
1135 """Ensures file at |dest| has same content as cached |digest|.
1136
1137 If file_mode is provided, it is used to set the executable bit if
1138 applicable.
1139 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001140 raise NotImplementedError()
1141
1142
1143class MemoryCache(LocalCache):
1144 """LocalCache implementation that stores everything in memory."""
1145
1146 def __init__(self):
1147 super(MemoryCache, self).__init__()
1148 # Let's not assume dict is thread safe.
1149 self._lock = threading.Lock()
1150 self._contents = {}
1151
1152 def cached_set(self):
1153 with self._lock:
1154 return set(self._contents)
1155
1156 def touch(self, digest, size):
1157 with self._lock:
1158 return digest in self._contents
1159
1160 def evict(self, digest):
1161 with self._lock:
1162 self._contents.pop(digest, None)
1163
1164 def read(self, digest):
1165 with self._lock:
1166 return self._contents[digest]
1167
1168 def write(self, digest, content):
1169 # Assemble whole stream before taking the lock.
1170 data = ''.join(content)
1171 with self._lock:
1172 self._contents[digest] = data
1173
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001174 def hardlink(self, digest, dest, file_mode):
1175 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001176 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001177 if file_mode is not None:
1178 # Ignores all other bits.
1179 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001180
1181
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001182def get_hash_algo(_namespace):
1183 """Return hash algorithm class to use when uploading to given |namespace|."""
1184 # TODO(vadimsh): Implement this at some point.
1185 return hashlib.sha1
1186
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001187
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001188def is_namespace_with_compression(namespace):
1189 """Returns True if given |namespace| stores compressed objects."""
1190 return namespace.endswith(('-gzip', '-deflate'))
1191
1192
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001193def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001194 """Returns an object that implements StorageApi interface."""
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001195 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001196 return IsolateServer(file_or_url, namespace)
1197 else:
1198 return FileSystem(file_or_url)
1199
1200
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001201def get_storage(file_or_url, namespace):
1202 """Returns Storage class configured with appropriate StorageApi instance."""
1203 return Storage(
1204 get_storage_api(file_or_url, namespace),
1205 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001206
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001207
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001208def expand_symlinks(indir, relfile):
1209 """Follows symlinks in |relfile|, but treating symlinks that point outside the
1210 build tree as if they were ordinary directories/files. Returns the final
1211 symlink-free target and a list of paths to symlinks encountered in the
1212 process.
1213
1214 The rule about symlinks outside the build tree is for the benefit of the
1215 Chromium OS ebuild, which symlinks the output directory to an unrelated path
1216 in the chroot.
1217
1218 Fails when a directory loop is detected, although in theory we could support
1219 that case.
1220 """
1221 is_directory = relfile.endswith(os.path.sep)
1222 done = indir
1223 todo = relfile.strip(os.path.sep)
1224 symlinks = []
1225
1226 while todo:
1227 pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1228 done, todo)
1229 if not symlink:
1230 todo = file_path.fix_native_path_case(done, todo)
1231 done = os.path.join(done, todo)
1232 break
1233 symlink_path = os.path.join(done, pre_symlink, symlink)
1234 post_symlink = post_symlink.lstrip(os.path.sep)
1235 # readlink doesn't exist on Windows.
1236 # pylint: disable=E1101
1237 target = os.path.normpath(os.path.join(done, pre_symlink))
1238 symlink_target = os.readlink(symlink_path)
1239 if os.path.isabs(symlink_target):
1240 # Absolute path are considered a normal directories. The use case is
1241 # generally someone who puts the output directory on a separate drive.
1242 target = symlink_target
1243 else:
1244 # The symlink itself could be using the wrong path case.
1245 target = file_path.fix_native_path_case(target, symlink_target)
1246
1247 if not os.path.exists(target):
1248 raise MappingError(
1249 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1250 target = file_path.get_native_path_case(target)
1251 if not file_path.path_starts_with(indir, target):
1252 done = symlink_path
1253 todo = post_symlink
1254 continue
1255 if file_path.path_starts_with(target, symlink_path):
1256 raise MappingError(
1257 'Can\'t map recursive symlink reference %s -> %s' %
1258 (symlink_path, target))
1259 logging.info('Found symlink: %s -> %s', symlink_path, target)
1260 symlinks.append(os.path.relpath(symlink_path, indir))
1261 # Treat the common prefix of the old and new paths as done, and start
1262 # scanning again.
1263 target = target.split(os.path.sep)
1264 symlink_path = symlink_path.split(os.path.sep)
1265 prefix_length = 0
1266 for target_piece, symlink_path_piece in zip(target, symlink_path):
1267 if target_piece == symlink_path_piece:
1268 prefix_length += 1
1269 else:
1270 break
1271 done = os.path.sep.join(target[:prefix_length])
1272 todo = os.path.join(
1273 os.path.sep.join(target[prefix_length:]), post_symlink)
1274
1275 relfile = os.path.relpath(done, indir)
1276 relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1277 return relfile, symlinks
1278
1279
1280def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1281 """Expands a single input. It can result in multiple outputs.
1282
1283 This function is recursive when relfile is a directory.
1284
1285 Note: this code doesn't properly handle recursive symlink like one created
1286 with:
1287 ln -s .. foo
1288 """
1289 if os.path.isabs(relfile):
1290 raise MappingError('Can\'t map absolute path %s' % relfile)
1291
1292 infile = file_path.normpath(os.path.join(indir, relfile))
1293 if not infile.startswith(indir):
1294 raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1295
1296 filepath = os.path.join(indir, relfile)
1297 native_filepath = file_path.get_native_path_case(filepath)
1298 if filepath != native_filepath:
1299 # Special case './'.
1300 if filepath != native_filepath + '.' + os.path.sep:
1301 # Give up enforcing strict path case on OSX. Really, it's that sad. The
1302 # case where it happens is very specific and hard to reproduce:
1303 # get_native_path_case(
1304 # u'Foo.framework/Versions/A/Resources/Something.nib') will return
1305 # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1306 #
1307 # Note that this is really something deep in OSX because running
1308 # ls Foo.framework/Versions/A
1309 # will print out 'Resources', while file_path.get_native_path_case()
1310 # returns a lower case 'r'.
1311 #
1312 # So *something* is happening under the hood resulting in the command 'ls'
1313 # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
1314 # have no idea why.
1315 if sys.platform != 'darwin':
1316 raise MappingError(
1317 'File path doesn\'t equal native file path\n%s != %s' %
1318 (filepath, native_filepath))
1319
1320 symlinks = []
1321 if follow_symlinks:
1322 relfile, symlinks = expand_symlinks(indir, relfile)
1323
1324 if relfile.endswith(os.path.sep):
1325 if not os.path.isdir(infile):
1326 raise MappingError(
1327 '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1328
1329 # Special case './'.
1330 if relfile.startswith('.' + os.path.sep):
1331 relfile = relfile[2:]
1332 outfiles = symlinks
1333 try:
1334 for filename in os.listdir(infile):
1335 inner_relfile = os.path.join(relfile, filename)
1336 if blacklist and blacklist(inner_relfile):
1337 continue
1338 if os.path.isdir(os.path.join(indir, inner_relfile)):
1339 inner_relfile += os.path.sep
1340 outfiles.extend(
1341 expand_directory_and_symlink(indir, inner_relfile, blacklist,
1342 follow_symlinks))
1343 return outfiles
1344 except OSError as e:
1345 raise MappingError(
1346 'Unable to iterate over directory %s.\n%s' % (infile, e))
1347 else:
1348 # Always add individual files even if they were blacklisted.
1349 if os.path.isdir(infile):
1350 raise MappingError(
1351 'Input directory %s must have a trailing slash' % infile)
1352
1353 if not os.path.isfile(infile):
1354 raise MappingError('Input file %s doesn\'t exist' % infile)
1355
1356 return symlinks + [relfile]
1357
1358
1359def process_input(filepath, prevdict, read_only, flavor, algo):
1360 """Processes an input file, a dependency, and return meta data about it.
1361
1362 Behaviors:
1363 - Retrieves the file mode, file size, file timestamp, file link
1364 destination if it is a file link and calcultate the SHA-1 of the file's
1365 content if the path points to a file and not a symlink.
1366
1367 Arguments:
1368 filepath: File to act on.
1369 prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1370 to skip recalculating the hash. Optional.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001371 read_only: If 1 or 2, the file mode is manipulated. In practice, only save
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001372 one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1373 windows, mode is not set since all files are 'executable' by
1374 default.
1375 flavor: One isolated flavor, like 'linux', 'mac' or 'win'.
1376 algo: Hashing algorithm used.
1377
1378 Returns:
1379 The necessary data to create a entry in the 'files' section of an .isolated
1380 file.
1381 """
1382 out = {}
1383 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1384 # if prevdict.get('T') == True:
1385 # # The file's content is ignored. Skip the time and hard code mode.
1386 # if get_flavor() != 'win':
1387 # out['m'] = stat.S_IRUSR | stat.S_IRGRP
1388 # out['s'] = 0
1389 # out['h'] = algo().hexdigest()
1390 # out['T'] = True
1391 # return out
1392
1393 # Always check the file stat and check if it is a link. The timestamp is used
1394 # to know if the file's content/symlink destination should be looked into.
1395 # E.g. only reuse from prevdict if the timestamp hasn't changed.
1396 # There is the risk of the file's timestamp being reset to its last value
1397 # manually while its content changed. We don't protect against that use case.
1398 try:
1399 filestats = os.lstat(filepath)
1400 except OSError:
1401 # The file is not present.
1402 raise MappingError('%s is missing' % filepath)
1403 is_link = stat.S_ISLNK(filestats.st_mode)
1404
1405 if flavor != 'win':
1406 # Ignore file mode on Windows since it's not really useful there.
1407 filemode = stat.S_IMODE(filestats.st_mode)
1408 # Remove write access for group and all access to 'others'.
1409 filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1410 if read_only:
1411 filemode &= ~stat.S_IWUSR
1412 if filemode & stat.S_IXUSR:
1413 filemode |= stat.S_IXGRP
1414 else:
1415 filemode &= ~stat.S_IXGRP
1416 if not is_link:
1417 out['m'] = filemode
1418
1419 # Used to skip recalculating the hash or link destination. Use the most recent
1420 # update time.
1421 # TODO(maruel): Save it in the .state file instead of .isolated so the
1422 # .isolated file is deterministic.
1423 out['t'] = int(round(filestats.st_mtime))
1424
1425 if not is_link:
1426 out['s'] = filestats.st_size
1427 # If the timestamp wasn't updated and the file size is still the same, carry
1428 # on the sha-1.
1429 if (prevdict.get('t') == out['t'] and
1430 prevdict.get('s') == out['s']):
1431 # Reuse the previous hash if available.
1432 out['h'] = prevdict.get('h')
1433 if not out.get('h'):
1434 out['h'] = hash_file(filepath, algo)
1435 else:
1436 # If the timestamp wasn't updated, carry on the link destination.
1437 if prevdict.get('t') == out['t']:
1438 # Reuse the previous link destination if available.
1439 out['l'] = prevdict.get('l')
1440 if out.get('l') is None:
1441 # The link could be in an incorrect path case. In practice, this only
1442 # happen on OSX on case insensitive HFS.
1443 # TODO(maruel): It'd be better if it was only done once, in
1444 # expand_directory_and_symlink(), so it would not be necessary to do again
1445 # here.
1446 symlink_value = os.readlink(filepath) # pylint: disable=E1101
1447 filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1448 native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1449 out['l'] = os.path.relpath(native_dest, filedir)
1450 return out
1451
1452
1453def save_isolated(isolated, data):
1454 """Writes one or multiple .isolated files.
1455
1456 Note: this reference implementation does not create child .isolated file so it
1457 always returns an empty list.
1458
1459 Returns the list of child isolated files that are included by |isolated|.
1460 """
1461 # Make sure the data is valid .isolated data by 'reloading' it.
1462 algo = SUPPORTED_ALGOS[data['algo']]
1463 load_isolated(json.dumps(data), data.get('flavor'), algo)
1464 tools.write_json(isolated, data, True)
1465 return []
1466
1467
1468
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001469def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001470 """Uploads the given tree to the given url.
1471
1472 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001473 base_url: The base url, it is assume that |base_url|/has/ can be used to
1474 query if an element was already uploaded, and |base_url|/store/
1475 can be used to upload a new element.
1476 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001477 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001478 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001479 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001480 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001481 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001482 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001483
1484
maruel@chromium.org41601642013-09-18 19:40:46 +00001485def load_isolated(content, os_flavor, algo):
1486 """Verifies the .isolated file is valid and loads this object with the json
1487 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001488
1489 Arguments:
1490 - content: raw serialized content to load.
1491 - os_flavor: OS to load this file on. Optional.
1492 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1493 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001494 """
1495 try:
1496 data = json.loads(content)
1497 except ValueError:
1498 raise ConfigError('Failed to parse: %s...' % content[:100])
1499
1500 if not isinstance(data, dict):
1501 raise ConfigError('Expected dict, got %r' % data)
1502
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001503 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001504 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001505 value = data.get('version', ISOLATED_FILE_VERSION)
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001506 if not isinstance(value, basestring):
1507 raise ConfigError('Expected string, got %r' % value)
1508 if not re.match(r'^(\d+)\.(\d+)$', value):
1509 raise ConfigError('Expected a compatible version, got %r' % value)
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001510 if value.split('.', 1)[0] != ISOLATED_FILE_VERSION.split('.', 1)[0]:
1511 raise ConfigError(
1512 'Expected compatible \'%s\' version, got %r' %
1513 (ISOLATED_FILE_VERSION, value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001514
1515 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001516 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001517 # Default the algorithm used in the .isolated file itself, falls back to
1518 # 'sha-1' if unspecified.
1519 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1520
maruel@chromium.org41601642013-09-18 19:40:46 +00001521 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001522 if key == 'algo':
1523 if not isinstance(value, basestring):
1524 raise ConfigError('Expected string, got %r' % value)
1525 if value not in SUPPORTED_ALGOS:
1526 raise ConfigError(
1527 'Expected one of \'%s\', got %r' %
1528 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1529 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1530 raise ConfigError(
1531 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1532
1533 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001534 if not isinstance(value, list):
1535 raise ConfigError('Expected list, got %r' % value)
1536 if not value:
1537 raise ConfigError('Expected non-empty command')
1538 for subvalue in value:
1539 if not isinstance(subvalue, basestring):
1540 raise ConfigError('Expected string, got %r' % subvalue)
1541
1542 elif key == 'files':
1543 if not isinstance(value, dict):
1544 raise ConfigError('Expected dict, got %r' % value)
1545 for subkey, subvalue in value.iteritems():
1546 if not isinstance(subkey, basestring):
1547 raise ConfigError('Expected string, got %r' % subkey)
1548 if not isinstance(subvalue, dict):
1549 raise ConfigError('Expected dict, got %r' % subvalue)
1550 for subsubkey, subsubvalue in subvalue.iteritems():
1551 if subsubkey == 'l':
1552 if not isinstance(subsubvalue, basestring):
1553 raise ConfigError('Expected string, got %r' % subsubvalue)
1554 elif subsubkey == 'm':
1555 if not isinstance(subsubvalue, int):
1556 raise ConfigError('Expected int, got %r' % subsubvalue)
1557 elif subsubkey == 'h':
1558 if not is_valid_hash(subsubvalue, algo):
1559 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1560 elif subsubkey == 's':
Marc-Antoine Ruelaab3a622013-11-28 09:47:05 -05001561 if not isinstance(subsubvalue, (int, long)):
1562 raise ConfigError('Expected int or long, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001563 else:
1564 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001565 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001566 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001567 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1568 subvalue)
1569 if bool('h' in subvalue) != bool('s' in subvalue):
1570 raise ConfigError(
1571 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1572 subvalue)
1573 if bool('s' in subvalue) == bool('l' in subvalue):
1574 raise ConfigError(
1575 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1576 subvalue)
1577 if bool('l' in subvalue) and bool('m' in subvalue):
1578 raise ConfigError(
1579 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001580 subvalue)
1581
1582 elif key == 'includes':
1583 if not isinstance(value, list):
1584 raise ConfigError('Expected list, got %r' % value)
1585 if not value:
1586 raise ConfigError('Expected non-empty includes list')
1587 for subvalue in value:
1588 if not is_valid_hash(subvalue, algo):
1589 raise ConfigError('Expected sha-1, got %r' % subvalue)
1590
1591 elif key == 'read_only':
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001592 if not value in (0, 1, 2):
1593 raise ConfigError('Expected 0, 1 or 2, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001594
1595 elif key == 'relative_cwd':
1596 if not isinstance(value, basestring):
1597 raise ConfigError('Expected string, got %r' % value)
1598
1599 elif key == 'os':
1600 if os_flavor and value != os_flavor:
1601 raise ConfigError(
1602 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1603 (os_flavor, value))
1604
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001605 elif key == 'version':
1606 # Already checked above.
1607 pass
1608
maruel@chromium.org41601642013-09-18 19:40:46 +00001609 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001610 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001611
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001612 # Automatically fix os.path.sep if necessary. While .isolated files are always
1613 # in the the native path format, someone could want to download an .isolated
1614 # tree from another OS.
1615 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1616 if 'files' in data:
1617 data['files'] = dict(
1618 (k.replace(wrong_path_sep, os.path.sep), v)
1619 for k, v in data['files'].iteritems())
1620 for v in data['files'].itervalues():
1621 if 'l' in v:
1622 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1623 if 'relative_cwd' in data:
1624 data['relative_cwd'] = data['relative_cwd'].replace(
1625 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001626 return data
1627
1628
1629class IsolatedFile(object):
1630 """Represents a single parsed .isolated file."""
1631 def __init__(self, obj_hash, algo):
1632 """|obj_hash| is really the sha-1 of the file."""
1633 logging.debug('IsolatedFile(%s)' % obj_hash)
1634 self.obj_hash = obj_hash
1635 self.algo = algo
1636 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1637 # .isolate and all the .isolated files recursively included by it with
1638 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1639 # .isolated file in the hash table, is important, as the later ones are not
1640 # processed until the firsts are retrieved and read.
1641 self.can_fetch = False
1642
1643 # Raw data.
1644 self.data = {}
1645 # A IsolatedFile instance, one per object in self.includes.
1646 self.children = []
1647
1648 # Set once the .isolated file is loaded.
1649 self._is_parsed = False
1650 # Set once the files are fetched.
1651 self.files_fetched = False
1652
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001653 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001654 """Verifies the .isolated file is valid and loads this object with the json
1655 data.
1656 """
1657 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1658 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001659 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001660 self.children = [
1661 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1662 ]
1663 self._is_parsed = True
1664
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001665 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001666 """Adds files in this .isolated file not present in |files| dictionary.
1667
1668 Preemptively request files.
1669
1670 Note that |files| is modified by this function.
1671 """
1672 assert self.can_fetch
1673 if not self._is_parsed or self.files_fetched:
1674 return
1675 logging.debug('fetch_files(%s)' % self.obj_hash)
1676 for filepath, properties in self.data.get('files', {}).iteritems():
1677 # Root isolated has priority on the files being mapped. In particular,
1678 # overriden files must not be fetched.
1679 if filepath not in files:
1680 files[filepath] = properties
1681 if 'h' in properties:
1682 # Preemptively request files.
1683 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001684 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001685 self.files_fetched = True
1686
1687
1688class Settings(object):
1689 """Results of a completely parsed .isolated file."""
1690 def __init__(self):
1691 self.command = []
1692 self.files = {}
1693 self.read_only = None
1694 self.relative_cwd = None
1695 # The main .isolated file, a IsolatedFile instance.
1696 self.root = None
1697
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001698 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001699 """Loads the .isolated and all the included .isolated asynchronously.
1700
1701 It enables support for "included" .isolated files. They are processed in
1702 strict order but fetched asynchronously from the cache. This is important so
1703 that a file in an included .isolated file that is overridden by an embedding
1704 .isolated file is not fetched needlessly. The includes are fetched in one
1705 pass and the files are fetched as soon as all the ones on the left-side
1706 of the tree were fetched.
1707
1708 The prioritization is very important here for nested .isolated files.
1709 'includes' have the highest priority and the algorithm is optimized for both
1710 deep and wide trees. A deep one is a long link of .isolated files referenced
1711 one at a time by one item in 'includes'. A wide one has a large number of
1712 'includes' in a single .isolated file. 'left' is defined as an included
1713 .isolated file earlier in the 'includes' list. So the order of the elements
1714 in 'includes' is important.
1715 """
1716 self.root = IsolatedFile(root_isolated_hash, algo)
1717
1718 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1719 pending = {}
1720 # Set of hashes of already retrieved items to refuse recursive includes.
1721 seen = set()
1722
1723 def retrieve(isolated_file):
1724 h = isolated_file.obj_hash
1725 if h in seen:
1726 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1727 assert h not in pending
1728 seen.add(h)
1729 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001730 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001731
1732 retrieve(self.root)
1733
1734 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001735 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001736 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001737 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001738 if item_hash == root_isolated_hash:
1739 # It's the root item.
1740 item.can_fetch = True
1741
1742 for new_child in item.children:
1743 retrieve(new_child)
1744
1745 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001746 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001747
1748 def check(n):
1749 return all(check(x) for x in n.children) and n.files_fetched
1750 assert check(self.root)
1751
1752 self.relative_cwd = self.relative_cwd or ''
maruel@chromium.org41601642013-09-18 19:40:46 +00001753
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001754 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001755 if node.can_fetch:
1756 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001757 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001758 will_break = False
1759 for i in node.children:
1760 if not i.can_fetch:
1761 if will_break:
1762 break
1763 # Automatically mark the first one as fetcheable.
1764 i.can_fetch = True
1765 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001766 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001767
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001768 def _update_self(self, fetch_queue, node):
1769 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001770 # Grabs properties.
1771 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001772 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001773 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001774 if self.command:
1775 self.command[0] = self.command[0].replace('/', os.path.sep)
1776 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001777 if self.read_only is None and node.data.get('read_only') is not None:
1778 self.read_only = node.data['read_only']
1779 if (self.relative_cwd is None and
1780 node.data.get('relative_cwd') is not None):
1781 self.relative_cwd = node.data['relative_cwd']
1782
1783
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001784def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001785 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001786 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001787
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001788 Arguments:
1789 isolated_hash: hash of the root *.isolated file.
1790 storage: Storage class that communicates with isolate storage.
1791 cache: LocalCache class that knows how to store and map files locally.
1792 algo: hash algorithm to use.
1793 outdir: Output directory to map file tree to.
1794 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1795 require_command: Ensure *.isolated specifies a command to run.
1796
1797 Returns:
1798 Settings object that holds details about loaded *.isolated file.
1799 """
1800 with cache:
1801 fetch_queue = FetchQueue(storage, cache)
1802 settings = Settings()
1803
1804 with tools.Profiler('GetIsolateds'):
1805 # Optionally support local files by manually adding them to cache.
1806 if not is_valid_hash(isolated_hash, algo):
1807 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1808
1809 # Load all *.isolated and start loading rest of the files.
1810 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001811 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001812 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1813 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001814 raise ConfigError('No command to run')
1815
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 with tools.Profiler('GetRest'):
1817 # Create file system hierarchy.
1818 if not os.path.isdir(outdir):
1819 os.makedirs(outdir)
1820 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001821 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001822
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001823 # Ensure working directory exists.
1824 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1825 if not os.path.isdir(cwd):
1826 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001827
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001828 # Multimap: digest -> list of pairs (path, props).
1829 remaining = {}
1830 for filepath, props in settings.files.iteritems():
1831 if 'h' in props:
1832 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001833
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001834 # Now block on the remaining files to be downloaded and mapped.
1835 logging.info('Retrieving remaining files (%d of them)...',
1836 fetch_queue.pending_count)
1837 last_update = time.time()
1838 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1839 while remaining:
1840 detector.ping()
1841
1842 # Wait for any item to finish fetching to cache.
1843 digest = fetch_queue.wait(remaining)
1844
1845 # Link corresponding files to a fetched item in cache.
1846 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001847 cache.hardlink(
1848 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001849
1850 # Report progress.
1851 duration = time.time() - last_update
1852 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1853 msg = '%d files remaining...' % len(remaining)
1854 print msg
1855 logging.info(msg)
1856 last_update = time.time()
1857
1858 # Cache could evict some items we just tried to fetch, it's a fatal error.
1859 if not fetch_queue.verify_all_cached():
1860 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001861 return settings
1862
1863
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001864def directory_to_metadata(root, algo, blacklist):
1865 """Returns the FileItem list and .isolated metadata for a directory."""
1866 root = file_path.get_native_path_case(root)
1867 metadata = dict(
1868 (relpath, process_input(
1869 os.path.join(root, relpath), {}, False, sys.platform, algo))
1870 for relpath in expand_directory_and_symlink(
1871 root, './', blacklist, True)
1872 )
1873 for v in metadata.itervalues():
1874 v.pop('t')
1875 items = [
1876 FileItem(
1877 path=os.path.join(root, relpath),
1878 digest=meta['h'],
1879 size=meta['s'],
1880 is_isolated=relpath.endswith('.isolated'))
1881 for relpath, meta in metadata.iteritems() if 'h' in meta
1882 ]
1883 return items, metadata
1884
1885
1886def archive(storage, algo, files, blacklist):
1887 """Stores every entries and returns the relevant data."""
1888 assert all(isinstance(i, unicode) for i in files), files
1889 if len(files) != len(set(map(os.path.abspath, files))):
1890 raise Error('Duplicate entries found.')
1891
1892 results = []
1893 # The temporary directory is only created as needed.
1894 tempdir = None
1895 try:
1896 # TODO(maruel): Yield the files to a worker thread.
1897 items_to_upload = []
1898 for f in files:
1899 try:
1900 filepath = os.path.abspath(f)
1901 if os.path.isdir(filepath):
1902 # Uploading a whole directory.
1903 items, metadata = directory_to_metadata(filepath, algo, blacklist)
1904
1905 # Create the .isolated file.
1906 if not tempdir:
1907 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1908 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1909 os.close(handle)
1910 data = {
1911 'algo': SUPPORTED_ALGOS_REVERSE[algo],
1912 'files': metadata,
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001913 'version': ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001914 }
1915 save_isolated(isolated, data)
1916 h = hash_file(isolated, algo)
1917 items_to_upload.extend(items)
1918 items_to_upload.append(
1919 FileItem(
1920 path=isolated,
1921 digest=h,
1922 size=os.stat(isolated).st_size,
1923 is_isolated=True))
1924 results.append((h, f))
1925
1926 elif os.path.isfile(filepath):
1927 h = hash_file(filepath, algo)
1928 items_to_upload.append(
1929 FileItem(
1930 path=filepath,
1931 digest=h,
1932 size=os.stat(filepath).st_size,
1933 is_isolated=f.endswith('.isolated')))
1934 results.append((h, f))
1935 else:
1936 raise Error('%s is neither a file or directory.' % f)
1937 except OSError:
1938 raise Error('Failed to process %s.' % f)
1939 # Technically we would care about the uploaded files but we don't much in
1940 # practice.
1941 _uploaded_files = storage.upload_items(items_to_upload)
1942 return results
1943 finally:
1944 if tempdir:
1945 shutil.rmtree(tempdir)
1946
1947
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001948@subcommand.usage('<file1..fileN> or - to read from stdin')
1949def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001950 """Archives data to the server.
1951
1952 If a directory is specified, a .isolated file is created the whole directory
1953 is uploaded. Then this .isolated file can be included in another one to run
1954 commands.
1955
1956 The commands output each file that was processed with its content hash. For
1957 directories, the .isolated generated for the directory is listed as the
1958 directory entry itself.
1959 """
1960 parser.add_option(
1961 '--blacklist',
1962 action='append', default=list(DEFAULT_BLACKLIST),
1963 help='List of regexp to use as blacklist filter when uploading '
1964 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001965 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001966
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001967 if files == ['-']:
1968 files = sys.stdin.readlines()
1969
1970 if not files:
1971 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001972
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001973 files = [f.decode('utf-8') for f in files]
1974 algo = get_hash_algo(options.namespace)
1975 blacklist = tools.gen_blacklist(options.blacklist)
1976 try:
1977 with get_storage(options.isolate_server, options.namespace) as storage:
1978 results = archive(storage, algo, files, blacklist)
1979 except Error as e:
1980 parser.error(e.args[0])
1981 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1982 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001983
1984
1985def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001986 """Download data from the server.
1987
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001988 It can either download individual files or a complete tree from a .isolated
1989 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001990 """
1991 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001992 '-i', '--isolated', metavar='HASH',
1993 help='hash of an isolated file, .isolated file content is discarded, use '
1994 '--file if you need it')
1995 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001996 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1997 help='hash and destination of a file, can be used multiple times')
1998 parser.add_option(
1999 '-t', '--target', metavar='DIR', default=os.getcwd(),
2000 help='destination directory')
2001 options, args = parser.parse_args(args)
2002 if args:
2003 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002004 if bool(options.isolated) == bool(options.file):
2005 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002006
2007 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002008
Vadim Shtayura3172be52013-12-03 12:49:05 -08002009 with get_storage(options.isolate_server, options.namespace) as storage:
2010 # Fetching individual files.
2011 if options.file:
2012 channel = threading_utils.TaskChannel()
2013 pending = {}
2014 for digest, dest in options.file:
2015 pending[digest] = dest
2016 storage.async_fetch(
2017 channel,
2018 WorkerPool.MED,
2019 digest,
2020 UNKNOWN_FILE_SIZE,
2021 functools.partial(file_write, os.path.join(options.target, dest)))
2022 while pending:
2023 fetched = channel.pull()
2024 dest = pending.pop(fetched)
2025 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002026
Vadim Shtayura3172be52013-12-03 12:49:05 -08002027 # Fetching whole isolated tree.
2028 if options.isolated:
2029 settings = fetch_isolated(
2030 isolated_hash=options.isolated,
2031 storage=storage,
2032 cache=MemoryCache(),
2033 algo=get_hash_algo(options.namespace),
2034 outdir=options.target,
2035 os_flavor=None,
2036 require_command=False)
2037 rel = os.path.join(options.target, settings.relative_cwd)
2038 print('To run this test please run from the directory %s:' %
2039 os.path.join(options.target, rel))
2040 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002041
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002042 return 0
2043
2044
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002045def add_isolate_server_options(parser):
2046 """Adds --isolate-server and --namespace options to parser."""
2047 parser.add_option(
2048 '-I', '--isolate-server',
2049 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
2050 help='URL of the Isolate Server to use or path to a remote directory. '
2051 'Defaults to the environment variable ISOLATE_SERVER if set.')
2052 parser.add_option(
2053 '--namespace', default='default-gzip',
2054 help='The namespace to use on the Isolate Server, default: %default')
2055
2056
2057def process_isolate_server_options(parser, options):
2058 """Processes the --isolate-server option and aborts if not specified.
2059
2060 If ambivalent, accepts non-URLs.
2061 """
2062 if not options.isolate_server:
2063 parser.error('--isolate-server is required.')
2064 if file_path.is_url(options.isolate_server):
2065 parts = urlparse.urlparse(options.isolate_server)
2066 if parts.query:
2067 parser.error('--isolate-server doesn\'t support query parameter.')
2068 if parts.fragment:
2069 parser.error('--isolate-server doesn\'t support fragment in the url.')
2070 options.isolate_server = options.isolate_server.rstrip('/')
2071
2072
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002073class OptionParserIsolateServer(tools.OptionParserWithLogging):
2074 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002075 tools.OptionParserWithLogging.__init__(
2076 self,
2077 version=__version__,
2078 prog=os.path.basename(sys.modules[__name__].__file__),
2079 **kwargs)
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002080 add_isolate_server_options(self)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002081 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002082
2083 def parse_args(self, *args, **kwargs):
2084 options, args = tools.OptionParserWithLogging.parse_args(
2085 self, *args, **kwargs)
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002086 process_isolate_server_options(self, options)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002087 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002088 return options, args
2089
2090
2091def main(args):
2092 dispatcher = subcommand.CommandDispatcher(__name__)
2093 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002094 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00002095 except Exception as e:
2096 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002097 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002098
2099
2100if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002101 fix_encoding.fix_encoding()
2102 tools.disable_buffering()
2103 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002104 sys.exit(main(sys.argv[1:]))