blob: 0a75158f38d931442fddf5f6b60de2fc0063066d [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05006"""Archives a set of files or directories to a server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05008__version__ = '0.3'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import shutil
17import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050019import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000030from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000031from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000032from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
34
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000035# Version of isolate protocol passed to the server in /handshake request.
36ISOLATE_PROTOCOL_VERSION = '1.0'
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -050037# Version stored and expected in .isolated files.
38ISOLATED_FILE_VERSION = '1.1'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040
41# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000042# All files are sorted by likelihood of a change in the file content
43# (currently file size is used to estimate this: larger the file -> larger the
44# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000046# and so on. Numbers here is a trade-off; the more per request, the lower the
47# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
48# larger values cause longer lookups, increasing the initial latency to start
49# uploading, which is especially an issue for large files. This value is
50# optimized for the "few thousands files to look up with minimal number of large
51# files missing" case.
52ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000053
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000054
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000055# A list of already compressed extension types that should not receive any
56# compression before being uploaded.
57ALREADY_COMPRESSED_TYPES = [
58 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
59 'wav', 'zip'
60]
61
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000062
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000063# The file size to be used when we don't know the correct file size,
64# generally used for .isolated files.
65UNKNOWN_FILE_SIZE = None
66
67
68# The size of each chunk to read when downloading and unzipping files.
69ZIPPED_FILE_CHUNK = 16 * 1024
70
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000071# Chunk size to use when doing disk I/O.
72DISK_FILE_CHUNK = 1024 * 1024
73
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000074# Chunk size to use when reading from network stream.
75NET_IO_FILE_CHUNK = 16 * 1024
76
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000077
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000078# Read timeout in seconds for downloads from isolate storage. If there's no
79# response from the server within this timeout whole download will be aborted.
80DOWNLOAD_READ_TIMEOUT = 60
81
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000082# Maximum expected delay (in seconds) between successive file fetches
83# in run_tha_test. If it takes longer than that, a deadlock might be happening
84# and all stack frames for all threads are dumped to log.
85DEADLOCK_TIMEOUT = 5 * 60
86
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000087
maruel@chromium.org41601642013-09-18 19:40:46 +000088# The delay (in seconds) to wait between logging statements when retrieving
89# the required files. This is intended to let the user (or buildbot) know that
90# the program is still running.
91DELAY_BETWEEN_UPDATES_IN_SECS = 30
92
93
maruel@chromium.org385d73d2013-09-19 18:33:21 +000094# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
95# specify the names here.
96SUPPORTED_ALGOS = {
97 'md5': hashlib.md5,
98 'sha-1': hashlib.sha1,
99 'sha-512': hashlib.sha512,
100}
101
102
103# Used for serialization.
104SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
105
106
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500107DEFAULT_BLACKLIST = (
108 # Temporary vim or python files.
109 r'^.+\.(?:pyc|swp)$',
110 # .git or .svn directory.
111 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
112)
113
114
115# Chromium-specific.
116DEFAULT_BLACKLIST += (
117 r'^.+\.(?:run_test_cases)$',
118 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
119)
120
121
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500122class Error(Exception):
123 """Generic runtime error."""
124 pass
125
126
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000127class ConfigError(ValueError):
128 """Generic failure to load a .isolated file."""
129 pass
130
131
132class MappingError(OSError):
133 """Failed to recreate the tree."""
134 pass
135
136
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000137def is_valid_hash(value, algo):
138 """Returns if the value is a valid hash for the corresponding algorithm."""
139 size = 2 * algo().digest_size
140 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
141
142
143def hash_file(filepath, algo):
144 """Calculates the hash of a file without reading it all in memory at once.
145
146 |algo| should be one of hashlib hashing algorithm.
147 """
148 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000149 with open(filepath, 'rb') as f:
150 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000152 if not chunk:
153 break
154 digest.update(chunk)
155 return digest.hexdigest()
156
157
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000158def stream_read(stream, chunk_size):
159 """Reads chunks from |stream| and yields them."""
160 while True:
161 data = stream.read(chunk_size)
162 if not data:
163 break
164 yield data
165
166
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800167def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
168 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000169 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800170 if offset:
171 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000172 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000173 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000174 if not data:
175 break
176 yield data
177
178
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000179def file_write(filepath, content_generator):
180 """Writes file content as generated by content_generator.
181
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000182 Creates the intermediary directory as needed.
183
184 Returns the number of bytes written.
185
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000186 Meant to be mocked out in unit tests.
187 """
188 filedir = os.path.dirname(filepath)
189 if not os.path.isdir(filedir):
190 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000191 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000192 with open(filepath, 'wb') as f:
193 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000194 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000195 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000196 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000197
198
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000199def zip_compress(content_generator, level=7):
200 """Reads chunks from |content_generator| and yields zip compressed chunks."""
201 compressor = zlib.compressobj(level)
202 for chunk in content_generator:
203 compressed = compressor.compress(chunk)
204 if compressed:
205 yield compressed
206 tail = compressor.flush(zlib.Z_FINISH)
207 if tail:
208 yield tail
209
210
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000211def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
212 """Reads zipped data from |content_generator| and yields decompressed data.
213
214 Decompresses data in small chunks (no larger than |chunk_size|) so that
215 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
216
217 Raises IOError if data is corrupted or incomplete.
218 """
219 decompressor = zlib.decompressobj()
220 compressed_size = 0
221 try:
222 for chunk in content_generator:
223 compressed_size += len(chunk)
224 data = decompressor.decompress(chunk, chunk_size)
225 if data:
226 yield data
227 while decompressor.unconsumed_tail:
228 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
229 if data:
230 yield data
231 tail = decompressor.flush()
232 if tail:
233 yield tail
234 except zlib.error as e:
235 raise IOError(
236 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
237 # Ensure all data was read and decompressed.
238 if decompressor.unused_data or decompressor.unconsumed_tail:
239 raise IOError('Not all data was decompressed')
240
241
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000242def get_zip_compression_level(filename):
243 """Given a filename calculates the ideal zip compression level to use."""
244 file_ext = os.path.splitext(filename)[1].lower()
245 # TODO(csharp): Profile to find what compression level works best.
246 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
247
248
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000249def create_directories(base_directory, files):
250 """Creates the directory structure needed by the given list of files."""
251 logging.debug('create_directories(%s, %d)', base_directory, len(files))
252 # Creates the tree of directories to create.
253 directories = set(os.path.dirname(f) for f in files)
254 for item in list(directories):
255 while item:
256 directories.add(item)
257 item = os.path.dirname(item)
258 for d in sorted(directories):
259 if d:
260 os.mkdir(os.path.join(base_directory, d))
261
262
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500263def create_symlinks(base_directory, files):
264 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000265 for filepath, properties in files:
266 if 'l' not in properties:
267 continue
268 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500269 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000270 logging.warning('Ignoring symlink %s', filepath)
271 continue
272 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500273 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000274 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000275
276
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000277def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000278 """Determines if the given files appears valid.
279
280 Currently it just checks the file's size.
281 """
282 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000283 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000284 actual_size = os.stat(filepath).st_size
285 if size != actual_size:
286 logging.warning(
287 'Found invalid item %s; %d != %d',
288 os.path.basename(filepath), actual_size, size)
289 return False
290 return True
291
292
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000293class WorkerPool(threading_utils.AutoRetryThreadPool):
294 """Thread pool that automatically retries on IOError and runs a preconfigured
295 function.
296 """
297 # Initial and maximum number of worker threads.
298 INITIAL_WORKERS = 2
299 MAX_WORKERS = 16
300 RETRIES = 5
301
302 def __init__(self):
303 super(WorkerPool, self).__init__(
304 [IOError],
305 self.RETRIES,
306 self.INITIAL_WORKERS,
307 self.MAX_WORKERS,
308 0,
309 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000310
311
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000312class Item(object):
313 """An item to push to Storage.
314
315 It starts its life in a main thread, travels to 'contains' thread, then to
316 'push' thread and then finally back to the main thread.
317
318 It is never used concurrently from multiple threads.
319 """
320
321 def __init__(self, digest, size, is_isolated=False):
322 self.digest = digest
323 self.size = size
324 self.is_isolated = is_isolated
325 self.compression_level = 6
326 self.push_state = None
327
328 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000329 """Iterable with content of this item in chunks of given size.
330
331 Arguments:
332 chunk_size: preferred size of the chunk to produce, may be ignored.
333 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000334 raise NotImplementedError()
335
336
337class FileItem(Item):
338 """A file to push to Storage."""
339
340 def __init__(self, path, digest, size, is_isolated):
341 super(FileItem, self).__init__(digest, size, is_isolated)
342 self.path = path
343 self.compression_level = get_zip_compression_level(path)
344
345 def content(self, chunk_size):
346 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000347
348
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000349class BufferItem(Item):
350 """A byte buffer to push to Storage."""
351
352 def __init__(self, buf, algo, is_isolated=False):
353 super(BufferItem, self).__init__(
354 algo(buf).hexdigest(), len(buf), is_isolated)
355 self.buffer = buf
356
357 def content(self, _chunk_size):
358 return [self.buffer]
359
360
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000361class Storage(object):
362 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000363
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000364 def __init__(self, storage_api, use_zip):
365 self.use_zip = use_zip
366 self._storage_api = storage_api
367 self._cpu_thread_pool = None
368 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000369
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000370 @property
371 def cpu_thread_pool(self):
372 """ThreadPool for CPU-bound tasks like zipping."""
373 if self._cpu_thread_pool is None:
374 self._cpu_thread_pool = threading_utils.ThreadPool(
375 2, max(threading_utils.num_processors(), 2), 0, 'zip')
376 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000377
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 @property
379 def net_thread_pool(self):
380 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
381 if self._net_thread_pool is None:
382 self._net_thread_pool = WorkerPool()
383 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000384
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 def close(self):
386 """Waits for all pending tasks to finish."""
387 if self._cpu_thread_pool:
388 self._cpu_thread_pool.join()
389 self._cpu_thread_pool.close()
390 self._cpu_thread_pool = None
391 if self._net_thread_pool:
392 self._net_thread_pool.join()
393 self._net_thread_pool.close()
394 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000395
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000396 def __enter__(self):
397 """Context manager interface."""
398 return self
399
400 def __exit__(self, _exc_type, _exc_value, _traceback):
401 """Context manager interface."""
402 self.close()
403 return False
404
405 def upload_tree(self, indir, infiles):
406 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000407
408 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000409 indir: root directory the infiles are based in.
410 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000411
412 Returns:
413 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000414 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
416
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000417 # Convert |indir| + |infiles| into a list of FileItem objects.
418 # Filter out symlinks, since they are not represented by items on isolate
419 # server side.
420 items = [
421 FileItem(
422 path=os.path.join(indir, filepath),
423 digest=metadata['h'],
424 size=metadata['s'],
425 is_isolated=metadata.get('priority') == '0')
426 for filepath, metadata in infiles.iteritems()
427 if 'l' not in metadata
428 ]
429
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000430 return self.upload_items(items)
431
432 def upload_items(self, items):
433 """Uploads bunch of items to the isolate server.
434
435 Will upload only items that are missing.
436
437 Arguments:
438 items: list of Item instances that represents data to upload.
439
440 Returns:
441 List of items that were uploaded. All other items are already there.
442 """
443 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
444 # used by swarming.py. There's no need to spawn multiple threads and try to
445 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
446 # 'push' should be performed sequentially in the context of current thread.
447
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000448 # For each digest keep only first Item that matches it. All other items
449 # are just indistinguishable copies from the point of view of isolate
450 # server (it doesn't care about paths at all, only content and digests).
451 seen = {}
452 duplicates = 0
453 for item in items:
454 if seen.setdefault(item.digest, item) is not item:
455 duplicates += 1
456 items = seen.values()
457 if duplicates:
458 logging.info('Skipped %d duplicated files', duplicates)
459
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000460 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000461 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000463 for missing_item in self.get_missing_items(items):
464 missing.add(missing_item)
465 self.async_push(
466 channel,
467 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
468 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000469
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000470 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000471 # No need to spawn deadlock detector thread if there's nothing to upload.
472 if missing:
473 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
474 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000475 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000476 detector.ping()
477 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000478 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000479 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000480 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 logging.info('All files are uploaded')
482
483 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000484 total = len(items)
485 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000486 logging.info(
487 'Total: %6d, %9.1fkb',
488 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000489 total_size / 1024.)
490 cache_hit = set(items) - missing
491 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000492 logging.info(
493 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
494 len(cache_hit),
495 cache_hit_size / 1024.,
496 len(cache_hit) * 100. / total,
497 cache_hit_size * 100. / total_size if total_size else 0)
498 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000499 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000500 logging.info(
501 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
502 len(cache_miss),
503 cache_miss_size / 1024.,
504 len(cache_miss) * 100. / total,
505 cache_miss_size * 100. / total_size if total_size else 0)
506
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000507 return uploaded
508
509 def get_fetch_url(self, digest):
510 """Returns an URL that can be used to fetch an item with given digest.
511
512 Arguments:
513 digest: hex digest of item to fetch.
514
515 Returns:
516 An URL or None if underlying protocol doesn't support this.
517 """
518 return self._storage_api.get_fetch_url(digest)
519
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000520 def async_push(self, channel, priority, item):
521 """Starts asynchronous push to the server in a parallel thread.
522
523 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000524 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000525 priority: thread pool task priority for the push.
526 item: item to upload as instance of Item class.
527 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000528 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000529 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000530 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000531 return item
532
533 # If zipping is not required, just start a push task.
534 if not self.use_zip:
535 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000536 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 return
538
539 # If zipping is enabled, zip in a separate thread.
540 def zip_and_push():
541 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
542 # content right here. It will block until all file is zipped.
543 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000544 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
545 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000546 data = ''.join(stream)
547 except Exception as exc:
548 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800549 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000550 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000551 self.net_thread_pool.add_task_with_channel(
552 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000553 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000554
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000555 def async_fetch(self, channel, priority, digest, size, sink):
556 """Starts asynchronous fetch from the server in a parallel thread.
557
558 Arguments:
559 channel: TaskChannel that receives back |digest| when download ends.
560 priority: thread pool task priority for the fetch.
561 digest: hex digest of an item to download.
562 size: expected size of the item (after decompression).
563 sink: function that will be called as sink(generator).
564 """
565 def fetch():
566 try:
567 # Prepare reading pipeline.
568 stream = self._storage_api.fetch(digest)
569 if self.use_zip:
570 stream = zip_decompress(stream, DISK_FILE_CHUNK)
571 # Run |stream| through verifier that will assert its size.
572 verifier = FetchStreamVerifier(stream, size)
573 # Verified stream goes to |sink|.
574 sink(verifier.run())
575 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800576 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000577 raise
578 return digest
579
580 # Don't bother with zip_thread_pool for decompression. Decompression is
581 # really fast and most probably IO bound anyway.
582 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
583
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000584 def get_missing_items(self, items):
585 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000586
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000587 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000588
589 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000590 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000591
592 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000593 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000595 channel = threading_utils.TaskChannel()
596 pending = 0
597 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000598 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
600 self._storage_api.contains, batch)
601 pending += 1
602 # Yield results as they come in.
603 for _ in xrange(pending):
604 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000605 yield missing
606
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000607 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000608 def batch_items_for_check(items):
609 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000610
611 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000612 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613
614 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000615 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
617 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000618 Batches of items to query for existence in a single operation,
619 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620 """
621 batch_count = 0
622 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
623 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000624 for item in sorted(items, key=lambda x: x.size, reverse=True):
625 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000626 if len(next_queries) == batch_size_limit:
627 yield next_queries
628 next_queries = []
629 batch_count += 1
630 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
631 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
632 if next_queries:
633 yield next_queries
634
635
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000636class FetchQueue(object):
637 """Fetches items from Storage and places them into LocalCache.
638
639 It manages multiple concurrent fetch operations. Acts as a bridge between
640 Storage and LocalCache so that Storage and LocalCache don't depend on each
641 other at all.
642 """
643
644 def __init__(self, storage, cache):
645 self.storage = storage
646 self.cache = cache
647 self._channel = threading_utils.TaskChannel()
648 self._pending = set()
649 self._accessed = set()
650 self._fetched = cache.cached_set()
651
652 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
653 """Starts asynchronous fetch of item |digest|."""
654 # Fetching it now?
655 if digest in self._pending:
656 return
657
658 # Mark this file as in use, verify_all_cached will later ensure it is still
659 # in cache.
660 self._accessed.add(digest)
661
662 # Already fetched? Notify cache to update item's LRU position.
663 if digest in self._fetched:
664 # 'touch' returns True if item is in cache and not corrupted.
665 if self.cache.touch(digest, size):
666 return
667 # Item is corrupted, remove it from cache and fetch it again.
668 self._fetched.remove(digest)
669 self.cache.evict(digest)
670
671 # TODO(maruel): It should look at the free disk space, the current cache
672 # size and the size of the new item on every new item:
673 # - Trim the cache as more entries are listed when free disk space is low,
674 # otherwise if the amount of data downloaded during the run > free disk
675 # space, it'll crash.
676 # - Make sure there's enough free disk space to fit all dependencies of
677 # this run! If not, abort early.
678
679 # Start fetching.
680 self._pending.add(digest)
681 self.storage.async_fetch(
682 self._channel, priority, digest, size,
683 functools.partial(self.cache.write, digest))
684
685 def wait(self, digests):
686 """Starts a loop that waits for at least one of |digests| to be retrieved.
687
688 Returns the first digest retrieved.
689 """
690 # Flush any already fetched items.
691 for digest in digests:
692 if digest in self._fetched:
693 return digest
694
695 # Ensure all requested items are being fetched now.
696 assert all(digest in self._pending for digest in digests), (
697 digests, self._pending)
698
699 # Wait for some requested item to finish fetching.
700 while self._pending:
701 digest = self._channel.pull()
702 self._pending.remove(digest)
703 self._fetched.add(digest)
704 if digest in digests:
705 return digest
706
707 # Should never reach this point due to assert above.
708 raise RuntimeError('Impossible state')
709
710 def inject_local_file(self, path, algo):
711 """Adds local file to the cache as if it was fetched from storage."""
712 with open(path, 'rb') as f:
713 data = f.read()
714 digest = algo(data).hexdigest()
715 self.cache.write(digest, [data])
716 self._fetched.add(digest)
717 return digest
718
719 @property
720 def pending_count(self):
721 """Returns number of items to be fetched."""
722 return len(self._pending)
723
724 def verify_all_cached(self):
725 """True if all accessed items are in cache."""
726 return self._accessed.issubset(self.cache.cached_set())
727
728
729class FetchStreamVerifier(object):
730 """Verifies that fetched file is valid before passing it to the LocalCache."""
731
732 def __init__(self, stream, expected_size):
733 self.stream = stream
734 self.expected_size = expected_size
735 self.current_size = 0
736
737 def run(self):
738 """Generator that yields same items as |stream|.
739
740 Verifies |stream| is complete before yielding a last chunk to consumer.
741
742 Also wraps IOError produced by consumer into MappingError exceptions since
743 otherwise Storage will retry fetch on unrelated local cache errors.
744 """
745 # Read one chunk ahead, keep it in |stored|.
746 # That way a complete stream can be verified before pushing last chunk
747 # to consumer.
748 stored = None
749 for chunk in self.stream:
750 assert chunk is not None
751 if stored is not None:
752 self._inspect_chunk(stored, is_last=False)
753 try:
754 yield stored
755 except IOError as exc:
756 raise MappingError('Failed to store an item in cache: %s' % exc)
757 stored = chunk
758 if stored is not None:
759 self._inspect_chunk(stored, is_last=True)
760 try:
761 yield stored
762 except IOError as exc:
763 raise MappingError('Failed to store an item in cache: %s' % exc)
764
765 def _inspect_chunk(self, chunk, is_last):
766 """Called for each fetched chunk before passing it to consumer."""
767 self.current_size += len(chunk)
768 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
769 (self.expected_size != self.current_size)):
770 raise IOError('Incorrect file size: expected %d, got %d' % (
771 self.expected_size, self.current_size))
772
773
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000774class StorageApi(object):
775 """Interface for classes that implement low-level storage operations."""
776
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000777 def get_fetch_url(self, digest):
778 """Returns an URL that can be used to fetch an item with given digest.
779
780 Arguments:
781 digest: hex digest of item to fetch.
782
783 Returns:
784 An URL or None if the protocol doesn't support this.
785 """
786 raise NotImplementedError()
787
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800788 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000789 """Fetches an object and yields its content.
790
791 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000792 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800793 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000794
795 Yields:
796 Chunks of downloaded item (as str objects).
797 """
798 raise NotImplementedError()
799
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000800 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000801 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000802
803 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000804 item: Item object that holds information about an item being pushed.
805 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000806
807 Returns:
808 None.
809 """
810 raise NotImplementedError()
811
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000812 def contains(self, items):
813 """Checks for existence of given |items| on the server.
814
815 Mutates |items| by assigning opaque implement specific object to Item's
816 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000817
818 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000819 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000820
821 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000822 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000823 """
824 raise NotImplementedError()
825
826
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000827class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000828 """StorageApi implementation that downloads and uploads to Isolate Server.
829
830 It uploads and downloads directly from Google Storage whenever appropriate.
831 """
832
833 class _PushState(object):
834 """State needed to call .push(), to be stored in Item.push_state."""
835 def __init__(self, upload_url, finalize_url):
836 self.upload_url = upload_url
837 self.finalize_url = finalize_url
838 self.uploaded = False
839 self.finalized = False
840
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000841 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000842 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000843 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000844 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000845 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000846 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000847 self._server_caps = None
848
849 @staticmethod
850 def _generate_handshake_request():
851 """Returns a dict to be sent as handshake request body."""
852 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
853 return {
854 'client_app_version': __version__,
855 'fetcher': True,
856 'protocol_version': ISOLATE_PROTOCOL_VERSION,
857 'pusher': True,
858 }
859
860 @staticmethod
861 def _validate_handshake_response(caps):
862 """Validates and normalizes handshake response."""
863 logging.info('Protocol version: %s', caps['protocol_version'])
864 logging.info('Server version: %s', caps['server_app_version'])
865 if caps.get('error'):
866 raise MappingError(caps['error'])
867 if not caps['access_token']:
868 raise ValueError('access_token is missing')
869 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000870
871 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000872 def _server_capabilities(self):
873 """Performs handshake with the server if not yet done.
874
875 Returns:
876 Server capabilities dictionary as returned by /handshake endpoint.
877
878 Raises:
879 MappingError if server rejects the handshake.
880 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000881 # TODO(maruel): Make this request much earlier asynchronously while the
882 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000883 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000884 if self._server_caps is None:
885 request_body = json.dumps(
886 self._generate_handshake_request(), separators=(',', ':'))
887 response = net.url_read(
888 url=self.base_url + '/content-gs/handshake',
889 data=request_body,
890 content_type='application/json',
891 method='POST')
892 if response is None:
893 raise MappingError('Failed to perform handshake.')
894 try:
895 caps = json.loads(response)
896 if not isinstance(caps, dict):
897 raise ValueError('Expecting JSON dict')
898 self._server_caps = self._validate_handshake_response(caps)
899 except (ValueError, KeyError, TypeError) as exc:
900 # KeyError exception has very confusing str conversion: it's just a
901 # missing key value and nothing else. So print exception class name
902 # as well.
903 raise MappingError('Invalid handshake response (%s): %s' % (
904 exc.__class__.__name__, exc))
905 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000906
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000907 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000908 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000909 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000910 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000911
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800912 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000913 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800914 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000915
916 # Because the app engine DB is only eventually consistent, retry 404 errors
917 # because the file might just not be visible yet (even though it has been
918 # uploaded).
919 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800920 source_url,
921 retry_404=True,
922 read_timeout=DOWNLOAD_READ_TIMEOUT,
923 headers={'Range': 'bytes=%d-' % offset} if offset else None)
924
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000925 if not connection:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000926 raise IOError('Unable to open connection to %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800927
928 # If |offset| is used, verify server respects it by checking Content-Range.
929 if offset:
930 content_range = connection.get_header('Content-Range')
931 if not content_range:
932 raise IOError('Missing Content-Range header')
933
934 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
935 # According to a spec, <size> can be '*' meaning "Total size of the file
936 # is not known in advance".
937 try:
938 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
939 if not match:
940 raise ValueError()
941 content_offset = int(match.group(1))
942 last_byte_index = int(match.group(2))
943 size = None if match.group(3) == '*' else int(match.group(3))
944 except ValueError:
945 raise IOError('Invalid Content-Range header: %s' % content_range)
946
947 # Ensure returned offset equals requested one.
948 if offset != content_offset:
949 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
950 offset, content_offset, content_range))
951
952 # Ensure entire tail of the file is returned.
953 if size is not None and last_byte_index + 1 != size:
954 raise IOError('Incomplete response. Content-Range: %s' % content_range)
955
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000956 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000957
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000958 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000959 assert isinstance(item, Item)
960 assert isinstance(item.push_state, IsolateServer._PushState)
961 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000962
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000963 # TODO(vadimsh): Do not read from |content| generator when retrying push.
964 # If |content| is indeed a generator, it can not be re-winded back
965 # to the beginning of the stream. A retry will find it exhausted. A possible
966 # solution is to wrap |content| generator with some sort of caching
967 # restartable generator. It should be done alongside streaming support
968 # implementation.
969
970 # This push operation may be a retry after failed finalization call below,
971 # no need to reupload contents in that case.
972 if not item.push_state.uploaded:
973 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
974 # upload support is implemented.
975 if isinstance(content, list) and len(content) == 1:
976 content = content[0]
977 else:
978 content = ''.join(content)
979 # PUT file to |upload_url|.
980 response = net.url_read(
981 url=item.push_state.upload_url,
982 data=content,
983 content_type='application/octet-stream',
984 method='PUT')
985 if response is None:
986 raise IOError('Failed to upload a file %s to %s' % (
987 item.digest, item.push_state.upload_url))
988 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000989 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000990 logging.info(
991 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000992
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000993 # Optionally notify the server that it's done.
994 if item.push_state.finalize_url:
995 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
996 # send it to isolated server. That way isolate server can verify that
997 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
998 # stored files).
999 response = net.url_read(
1000 url=item.push_state.finalize_url,
1001 data='',
1002 content_type='application/json',
1003 method='POST')
1004 if response is None:
1005 raise IOError('Failed to finalize an upload of %s' % item.digest)
1006 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001007
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001008 def contains(self, items):
1009 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001010
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001011 # Request body is a json encoded list of dicts.
1012 body = [
1013 {
1014 'h': item.digest,
1015 's': item.size,
1016 'i': int(item.is_isolated),
1017 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001018 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001019
1020 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1021 self.base_url,
1022 self.namespace,
1023 urllib.quote(self._server_capabilities['access_token']))
1024 response_body = net.url_read(
1025 url=query_url,
1026 data=json.dumps(body, separators=(',', ':')),
1027 content_type='application/json',
1028 method='POST')
1029 if response_body is None:
1030 raise MappingError('Failed to execute /pre-upload query')
1031
1032 # Response body is a list of push_urls (or null if file is already present).
1033 try:
1034 response = json.loads(response_body)
1035 if not isinstance(response, list):
1036 raise ValueError('Expecting response with json-encoded list')
1037 if len(response) != len(items):
1038 raise ValueError(
1039 'Incorrect number of items in the list, expected %d, '
1040 'but got %d' % (len(items), len(response)))
1041 except ValueError as err:
1042 raise MappingError(
1043 'Invalid response from server: %s, body is %s' % (err, response_body))
1044
1045 # Pick Items that are missing, attach _PushState to them.
1046 missing_items = []
1047 for i, push_urls in enumerate(response):
1048 if push_urls:
1049 assert len(push_urls) == 2, str(push_urls)
1050 item = items[i]
1051 assert item.push_state is None
1052 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
1053 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001054 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001055 len(items), len(items) - len(missing_items))
1056 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001057
1058
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001059class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001060 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001061
1062 The common use case is a NFS/CIFS file server that is mounted locally that is
1063 used to fetch the file on a local partition.
1064 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001065
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001066 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001067 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001068 self.base_path = base_path
1069
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001070 def get_fetch_url(self, digest):
1071 return None
1072
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001073 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001074 assert isinstance(digest, basestring)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001075 return file_read(os.path.join(self.base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001076
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001077 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001078 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001079 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001080
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001081 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001082 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001083 item for item in items
1084 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001085 ]
1086
1087
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001088class LocalCache(object):
1089 """Local cache that stores objects fetched via Storage.
1090
1091 It can be accessed concurrently from multiple threads, so it should protect
1092 its internal state with some lock.
1093 """
1094
1095 def __enter__(self):
1096 """Context manager interface."""
1097 return self
1098
1099 def __exit__(self, _exc_type, _exec_value, _traceback):
1100 """Context manager interface."""
1101 return False
1102
1103 def cached_set(self):
1104 """Returns a set of all cached digests (always a new object)."""
1105 raise NotImplementedError()
1106
1107 def touch(self, digest, size):
1108 """Ensures item is not corrupted and updates its LRU position.
1109
1110 Arguments:
1111 digest: hash digest of item to check.
1112 size: expected size of this item.
1113
1114 Returns:
1115 True if item is in cache and not corrupted.
1116 """
1117 raise NotImplementedError()
1118
1119 def evict(self, digest):
1120 """Removes item from cache if it's there."""
1121 raise NotImplementedError()
1122
1123 def read(self, digest):
1124 """Returns contents of the cached item as a single str."""
1125 raise NotImplementedError()
1126
1127 def write(self, digest, content):
1128 """Reads data from |content| generator and stores it in cache."""
1129 raise NotImplementedError()
1130
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001131 def hardlink(self, digest, dest, file_mode):
1132 """Ensures file at |dest| has same content as cached |digest|.
1133
1134 If file_mode is provided, it is used to set the executable bit if
1135 applicable.
1136 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001137 raise NotImplementedError()
1138
1139
1140class MemoryCache(LocalCache):
1141 """LocalCache implementation that stores everything in memory."""
1142
1143 def __init__(self):
1144 super(MemoryCache, self).__init__()
1145 # Let's not assume dict is thread safe.
1146 self._lock = threading.Lock()
1147 self._contents = {}
1148
1149 def cached_set(self):
1150 with self._lock:
1151 return set(self._contents)
1152
1153 def touch(self, digest, size):
1154 with self._lock:
1155 return digest in self._contents
1156
1157 def evict(self, digest):
1158 with self._lock:
1159 self._contents.pop(digest, None)
1160
1161 def read(self, digest):
1162 with self._lock:
1163 return self._contents[digest]
1164
1165 def write(self, digest, content):
1166 # Assemble whole stream before taking the lock.
1167 data = ''.join(content)
1168 with self._lock:
1169 self._contents[digest] = data
1170
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001171 def hardlink(self, digest, dest, file_mode):
1172 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001173 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001174 if file_mode is not None:
1175 # Ignores all other bits.
1176 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001177
1178
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001179def get_hash_algo(_namespace):
1180 """Return hash algorithm class to use when uploading to given |namespace|."""
1181 # TODO(vadimsh): Implement this at some point.
1182 return hashlib.sha1
1183
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001184
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001185def is_namespace_with_compression(namespace):
1186 """Returns True if given |namespace| stores compressed objects."""
1187 return namespace.endswith(('-gzip', '-deflate'))
1188
1189
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001190def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001191 """Returns an object that implements StorageApi interface."""
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001192 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001193 return IsolateServer(file_or_url, namespace)
1194 else:
1195 return FileSystem(file_or_url)
1196
1197
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001198def get_storage(file_or_url, namespace):
1199 """Returns Storage class configured with appropriate StorageApi instance."""
1200 return Storage(
1201 get_storage_api(file_or_url, namespace),
1202 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001203
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001204
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001205def expand_symlinks(indir, relfile):
1206 """Follows symlinks in |relfile|, but treating symlinks that point outside the
1207 build tree as if they were ordinary directories/files. Returns the final
1208 symlink-free target and a list of paths to symlinks encountered in the
1209 process.
1210
1211 The rule about symlinks outside the build tree is for the benefit of the
1212 Chromium OS ebuild, which symlinks the output directory to an unrelated path
1213 in the chroot.
1214
1215 Fails when a directory loop is detected, although in theory we could support
1216 that case.
1217 """
1218 is_directory = relfile.endswith(os.path.sep)
1219 done = indir
1220 todo = relfile.strip(os.path.sep)
1221 symlinks = []
1222
1223 while todo:
1224 pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1225 done, todo)
1226 if not symlink:
1227 todo = file_path.fix_native_path_case(done, todo)
1228 done = os.path.join(done, todo)
1229 break
1230 symlink_path = os.path.join(done, pre_symlink, symlink)
1231 post_symlink = post_symlink.lstrip(os.path.sep)
1232 # readlink doesn't exist on Windows.
1233 # pylint: disable=E1101
1234 target = os.path.normpath(os.path.join(done, pre_symlink))
1235 symlink_target = os.readlink(symlink_path)
1236 if os.path.isabs(symlink_target):
1237 # Absolute path are considered a normal directories. The use case is
1238 # generally someone who puts the output directory on a separate drive.
1239 target = symlink_target
1240 else:
1241 # The symlink itself could be using the wrong path case.
1242 target = file_path.fix_native_path_case(target, symlink_target)
1243
1244 if not os.path.exists(target):
1245 raise MappingError(
1246 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1247 target = file_path.get_native_path_case(target)
1248 if not file_path.path_starts_with(indir, target):
1249 done = symlink_path
1250 todo = post_symlink
1251 continue
1252 if file_path.path_starts_with(target, symlink_path):
1253 raise MappingError(
1254 'Can\'t map recursive symlink reference %s -> %s' %
1255 (symlink_path, target))
1256 logging.info('Found symlink: %s -> %s', symlink_path, target)
1257 symlinks.append(os.path.relpath(symlink_path, indir))
1258 # Treat the common prefix of the old and new paths as done, and start
1259 # scanning again.
1260 target = target.split(os.path.sep)
1261 symlink_path = symlink_path.split(os.path.sep)
1262 prefix_length = 0
1263 for target_piece, symlink_path_piece in zip(target, symlink_path):
1264 if target_piece == symlink_path_piece:
1265 prefix_length += 1
1266 else:
1267 break
1268 done = os.path.sep.join(target[:prefix_length])
1269 todo = os.path.join(
1270 os.path.sep.join(target[prefix_length:]), post_symlink)
1271
1272 relfile = os.path.relpath(done, indir)
1273 relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1274 return relfile, symlinks
1275
1276
1277def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1278 """Expands a single input. It can result in multiple outputs.
1279
1280 This function is recursive when relfile is a directory.
1281
1282 Note: this code doesn't properly handle recursive symlink like one created
1283 with:
1284 ln -s .. foo
1285 """
1286 if os.path.isabs(relfile):
1287 raise MappingError('Can\'t map absolute path %s' % relfile)
1288
1289 infile = file_path.normpath(os.path.join(indir, relfile))
1290 if not infile.startswith(indir):
1291 raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1292
1293 filepath = os.path.join(indir, relfile)
1294 native_filepath = file_path.get_native_path_case(filepath)
1295 if filepath != native_filepath:
1296 # Special case './'.
1297 if filepath != native_filepath + '.' + os.path.sep:
1298 # Give up enforcing strict path case on OSX. Really, it's that sad. The
1299 # case where it happens is very specific and hard to reproduce:
1300 # get_native_path_case(
1301 # u'Foo.framework/Versions/A/Resources/Something.nib') will return
1302 # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1303 #
1304 # Note that this is really something deep in OSX because running
1305 # ls Foo.framework/Versions/A
1306 # will print out 'Resources', while file_path.get_native_path_case()
1307 # returns a lower case 'r'.
1308 #
1309 # So *something* is happening under the hood resulting in the command 'ls'
1310 # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
1311 # have no idea why.
1312 if sys.platform != 'darwin':
1313 raise MappingError(
1314 'File path doesn\'t equal native file path\n%s != %s' %
1315 (filepath, native_filepath))
1316
1317 symlinks = []
1318 if follow_symlinks:
1319 relfile, symlinks = expand_symlinks(indir, relfile)
1320
1321 if relfile.endswith(os.path.sep):
1322 if not os.path.isdir(infile):
1323 raise MappingError(
1324 '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1325
1326 # Special case './'.
1327 if relfile.startswith('.' + os.path.sep):
1328 relfile = relfile[2:]
1329 outfiles = symlinks
1330 try:
1331 for filename in os.listdir(infile):
1332 inner_relfile = os.path.join(relfile, filename)
1333 if blacklist and blacklist(inner_relfile):
1334 continue
1335 if os.path.isdir(os.path.join(indir, inner_relfile)):
1336 inner_relfile += os.path.sep
1337 outfiles.extend(
1338 expand_directory_and_symlink(indir, inner_relfile, blacklist,
1339 follow_symlinks))
1340 return outfiles
1341 except OSError as e:
1342 raise MappingError(
1343 'Unable to iterate over directory %s.\n%s' % (infile, e))
1344 else:
1345 # Always add individual files even if they were blacklisted.
1346 if os.path.isdir(infile):
1347 raise MappingError(
1348 'Input directory %s must have a trailing slash' % infile)
1349
1350 if not os.path.isfile(infile):
1351 raise MappingError('Input file %s doesn\'t exist' % infile)
1352
1353 return symlinks + [relfile]
1354
1355
1356def process_input(filepath, prevdict, read_only, flavor, algo):
1357 """Processes an input file, a dependency, and return meta data about it.
1358
1359 Behaviors:
1360 - Retrieves the file mode, file size, file timestamp, file link
1361 destination if it is a file link and calcultate the SHA-1 of the file's
1362 content if the path points to a file and not a symlink.
1363
1364 Arguments:
1365 filepath: File to act on.
1366 prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1367 to skip recalculating the hash. Optional.
1368 read_only: If True, the file mode is manipulated. In practice, only save
1369 one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1370 windows, mode is not set since all files are 'executable' by
1371 default.
1372 flavor: One isolated flavor, like 'linux', 'mac' or 'win'.
1373 algo: Hashing algorithm used.
1374
1375 Returns:
1376 The necessary data to create a entry in the 'files' section of an .isolated
1377 file.
1378 """
1379 out = {}
1380 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1381 # if prevdict.get('T') == True:
1382 # # The file's content is ignored. Skip the time and hard code mode.
1383 # if get_flavor() != 'win':
1384 # out['m'] = stat.S_IRUSR | stat.S_IRGRP
1385 # out['s'] = 0
1386 # out['h'] = algo().hexdigest()
1387 # out['T'] = True
1388 # return out
1389
1390 # Always check the file stat and check if it is a link. The timestamp is used
1391 # to know if the file's content/symlink destination should be looked into.
1392 # E.g. only reuse from prevdict if the timestamp hasn't changed.
1393 # There is the risk of the file's timestamp being reset to its last value
1394 # manually while its content changed. We don't protect against that use case.
1395 try:
1396 filestats = os.lstat(filepath)
1397 except OSError:
1398 # The file is not present.
1399 raise MappingError('%s is missing' % filepath)
1400 is_link = stat.S_ISLNK(filestats.st_mode)
1401
1402 if flavor != 'win':
1403 # Ignore file mode on Windows since it's not really useful there.
1404 filemode = stat.S_IMODE(filestats.st_mode)
1405 # Remove write access for group and all access to 'others'.
1406 filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1407 if read_only:
1408 filemode &= ~stat.S_IWUSR
1409 if filemode & stat.S_IXUSR:
1410 filemode |= stat.S_IXGRP
1411 else:
1412 filemode &= ~stat.S_IXGRP
1413 if not is_link:
1414 out['m'] = filemode
1415
1416 # Used to skip recalculating the hash or link destination. Use the most recent
1417 # update time.
1418 # TODO(maruel): Save it in the .state file instead of .isolated so the
1419 # .isolated file is deterministic.
1420 out['t'] = int(round(filestats.st_mtime))
1421
1422 if not is_link:
1423 out['s'] = filestats.st_size
1424 # If the timestamp wasn't updated and the file size is still the same, carry
1425 # on the sha-1.
1426 if (prevdict.get('t') == out['t'] and
1427 prevdict.get('s') == out['s']):
1428 # Reuse the previous hash if available.
1429 out['h'] = prevdict.get('h')
1430 if not out.get('h'):
1431 out['h'] = hash_file(filepath, algo)
1432 else:
1433 # If the timestamp wasn't updated, carry on the link destination.
1434 if prevdict.get('t') == out['t']:
1435 # Reuse the previous link destination if available.
1436 out['l'] = prevdict.get('l')
1437 if out.get('l') is None:
1438 # The link could be in an incorrect path case. In practice, this only
1439 # happen on OSX on case insensitive HFS.
1440 # TODO(maruel): It'd be better if it was only done once, in
1441 # expand_directory_and_symlink(), so it would not be necessary to do again
1442 # here.
1443 symlink_value = os.readlink(filepath) # pylint: disable=E1101
1444 filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1445 native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1446 out['l'] = os.path.relpath(native_dest, filedir)
1447 return out
1448
1449
1450def save_isolated(isolated, data):
1451 """Writes one or multiple .isolated files.
1452
1453 Note: this reference implementation does not create child .isolated file so it
1454 always returns an empty list.
1455
1456 Returns the list of child isolated files that are included by |isolated|.
1457 """
1458 # Make sure the data is valid .isolated data by 'reloading' it.
1459 algo = SUPPORTED_ALGOS[data['algo']]
1460 load_isolated(json.dumps(data), data.get('flavor'), algo)
1461 tools.write_json(isolated, data, True)
1462 return []
1463
1464
1465
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001466def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001467 """Uploads the given tree to the given url.
1468
1469 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001470 base_url: The base url, it is assume that |base_url|/has/ can be used to
1471 query if an element was already uploaded, and |base_url|/store/
1472 can be used to upload a new element.
1473 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001474 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001475 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001476 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001477 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001478 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001479 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001480
1481
maruel@chromium.org41601642013-09-18 19:40:46 +00001482def load_isolated(content, os_flavor, algo):
1483 """Verifies the .isolated file is valid and loads this object with the json
1484 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001485
1486 Arguments:
1487 - content: raw serialized content to load.
1488 - os_flavor: OS to load this file on. Optional.
1489 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1490 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001491 """
1492 try:
1493 data = json.loads(content)
1494 except ValueError:
1495 raise ConfigError('Failed to parse: %s...' % content[:100])
1496
1497 if not isinstance(data, dict):
1498 raise ConfigError('Expected dict, got %r' % data)
1499
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001500 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001501 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001502 value = data.get('version', ISOLATED_FILE_VERSION)
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001503 if not isinstance(value, basestring):
1504 raise ConfigError('Expected string, got %r' % value)
1505 if not re.match(r'^(\d+)\.(\d+)$', value):
1506 raise ConfigError('Expected a compatible version, got %r' % value)
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001507 if value.split('.', 1)[0] != ISOLATED_FILE_VERSION.split('.', 1)[0]:
1508 raise ConfigError(
1509 'Expected compatible \'%s\' version, got %r' %
1510 (ISOLATED_FILE_VERSION, value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001511
1512 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001513 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001514 # Default the algorithm used in the .isolated file itself, falls back to
1515 # 'sha-1' if unspecified.
1516 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1517
maruel@chromium.org41601642013-09-18 19:40:46 +00001518 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001519 if key == 'algo':
1520 if not isinstance(value, basestring):
1521 raise ConfigError('Expected string, got %r' % value)
1522 if value not in SUPPORTED_ALGOS:
1523 raise ConfigError(
1524 'Expected one of \'%s\', got %r' %
1525 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1526 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1527 raise ConfigError(
1528 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1529
1530 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001531 if not isinstance(value, list):
1532 raise ConfigError('Expected list, got %r' % value)
1533 if not value:
1534 raise ConfigError('Expected non-empty command')
1535 for subvalue in value:
1536 if not isinstance(subvalue, basestring):
1537 raise ConfigError('Expected string, got %r' % subvalue)
1538
1539 elif key == 'files':
1540 if not isinstance(value, dict):
1541 raise ConfigError('Expected dict, got %r' % value)
1542 for subkey, subvalue in value.iteritems():
1543 if not isinstance(subkey, basestring):
1544 raise ConfigError('Expected string, got %r' % subkey)
1545 if not isinstance(subvalue, dict):
1546 raise ConfigError('Expected dict, got %r' % subvalue)
1547 for subsubkey, subsubvalue in subvalue.iteritems():
1548 if subsubkey == 'l':
1549 if not isinstance(subsubvalue, basestring):
1550 raise ConfigError('Expected string, got %r' % subsubvalue)
1551 elif subsubkey == 'm':
1552 if not isinstance(subsubvalue, int):
1553 raise ConfigError('Expected int, got %r' % subsubvalue)
1554 elif subsubkey == 'h':
1555 if not is_valid_hash(subsubvalue, algo):
1556 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1557 elif subsubkey == 's':
Marc-Antoine Ruelaab3a622013-11-28 09:47:05 -05001558 if not isinstance(subsubvalue, (int, long)):
1559 raise ConfigError('Expected int or long, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001560 else:
1561 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001562 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001563 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001564 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1565 subvalue)
1566 if bool('h' in subvalue) != bool('s' in subvalue):
1567 raise ConfigError(
1568 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1569 subvalue)
1570 if bool('s' in subvalue) == bool('l' in subvalue):
1571 raise ConfigError(
1572 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1573 subvalue)
1574 if bool('l' in subvalue) and bool('m' in subvalue):
1575 raise ConfigError(
1576 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001577 subvalue)
1578
1579 elif key == 'includes':
1580 if not isinstance(value, list):
1581 raise ConfigError('Expected list, got %r' % value)
1582 if not value:
1583 raise ConfigError('Expected non-empty includes list')
1584 for subvalue in value:
1585 if not is_valid_hash(subvalue, algo):
1586 raise ConfigError('Expected sha-1, got %r' % subvalue)
1587
1588 elif key == 'read_only':
1589 if not isinstance(value, bool):
1590 raise ConfigError('Expected bool, got %r' % value)
1591
1592 elif key == 'relative_cwd':
1593 if not isinstance(value, basestring):
1594 raise ConfigError('Expected string, got %r' % value)
1595
1596 elif key == 'os':
1597 if os_flavor and value != os_flavor:
1598 raise ConfigError(
1599 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1600 (os_flavor, value))
1601
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001602 elif key == 'version':
1603 # Already checked above.
1604 pass
1605
maruel@chromium.org41601642013-09-18 19:40:46 +00001606 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001607 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001608
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001609 # Automatically fix os.path.sep if necessary. While .isolated files are always
1610 # in the the native path format, someone could want to download an .isolated
1611 # tree from another OS.
1612 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1613 if 'files' in data:
1614 data['files'] = dict(
1615 (k.replace(wrong_path_sep, os.path.sep), v)
1616 for k, v in data['files'].iteritems())
1617 for v in data['files'].itervalues():
1618 if 'l' in v:
1619 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1620 if 'relative_cwd' in data:
1621 data['relative_cwd'] = data['relative_cwd'].replace(
1622 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001623 return data
1624
1625
1626class IsolatedFile(object):
1627 """Represents a single parsed .isolated file."""
1628 def __init__(self, obj_hash, algo):
1629 """|obj_hash| is really the sha-1 of the file."""
1630 logging.debug('IsolatedFile(%s)' % obj_hash)
1631 self.obj_hash = obj_hash
1632 self.algo = algo
1633 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1634 # .isolate and all the .isolated files recursively included by it with
1635 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1636 # .isolated file in the hash table, is important, as the later ones are not
1637 # processed until the firsts are retrieved and read.
1638 self.can_fetch = False
1639
1640 # Raw data.
1641 self.data = {}
1642 # A IsolatedFile instance, one per object in self.includes.
1643 self.children = []
1644
1645 # Set once the .isolated file is loaded.
1646 self._is_parsed = False
1647 # Set once the files are fetched.
1648 self.files_fetched = False
1649
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001650 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001651 """Verifies the .isolated file is valid and loads this object with the json
1652 data.
1653 """
1654 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1655 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001656 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001657 self.children = [
1658 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1659 ]
1660 self._is_parsed = True
1661
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001662 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001663 """Adds files in this .isolated file not present in |files| dictionary.
1664
1665 Preemptively request files.
1666
1667 Note that |files| is modified by this function.
1668 """
1669 assert self.can_fetch
1670 if not self._is_parsed or self.files_fetched:
1671 return
1672 logging.debug('fetch_files(%s)' % self.obj_hash)
1673 for filepath, properties in self.data.get('files', {}).iteritems():
1674 # Root isolated has priority on the files being mapped. In particular,
1675 # overriden files must not be fetched.
1676 if filepath not in files:
1677 files[filepath] = properties
1678 if 'h' in properties:
1679 # Preemptively request files.
1680 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001681 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001682 self.files_fetched = True
1683
1684
1685class Settings(object):
1686 """Results of a completely parsed .isolated file."""
1687 def __init__(self):
1688 self.command = []
1689 self.files = {}
1690 self.read_only = None
1691 self.relative_cwd = None
1692 # The main .isolated file, a IsolatedFile instance.
1693 self.root = None
1694
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001695 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001696 """Loads the .isolated and all the included .isolated asynchronously.
1697
1698 It enables support for "included" .isolated files. They are processed in
1699 strict order but fetched asynchronously from the cache. This is important so
1700 that a file in an included .isolated file that is overridden by an embedding
1701 .isolated file is not fetched needlessly. The includes are fetched in one
1702 pass and the files are fetched as soon as all the ones on the left-side
1703 of the tree were fetched.
1704
1705 The prioritization is very important here for nested .isolated files.
1706 'includes' have the highest priority and the algorithm is optimized for both
1707 deep and wide trees. A deep one is a long link of .isolated files referenced
1708 one at a time by one item in 'includes'. A wide one has a large number of
1709 'includes' in a single .isolated file. 'left' is defined as an included
1710 .isolated file earlier in the 'includes' list. So the order of the elements
1711 in 'includes' is important.
1712 """
1713 self.root = IsolatedFile(root_isolated_hash, algo)
1714
1715 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1716 pending = {}
1717 # Set of hashes of already retrieved items to refuse recursive includes.
1718 seen = set()
1719
1720 def retrieve(isolated_file):
1721 h = isolated_file.obj_hash
1722 if h in seen:
1723 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1724 assert h not in pending
1725 seen.add(h)
1726 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001727 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001728
1729 retrieve(self.root)
1730
1731 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001732 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001733 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001734 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001735 if item_hash == root_isolated_hash:
1736 # It's the root item.
1737 item.can_fetch = True
1738
1739 for new_child in item.children:
1740 retrieve(new_child)
1741
1742 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001743 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001744
1745 def check(n):
1746 return all(check(x) for x in n.children) and n.files_fetched
1747 assert check(self.root)
1748
1749 self.relative_cwd = self.relative_cwd or ''
1750 self.read_only = self.read_only or False
1751
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001752 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001753 if node.can_fetch:
1754 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001755 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001756 will_break = False
1757 for i in node.children:
1758 if not i.can_fetch:
1759 if will_break:
1760 break
1761 # Automatically mark the first one as fetcheable.
1762 i.can_fetch = True
1763 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001764 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001765
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001766 def _update_self(self, fetch_queue, node):
1767 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001768 # Grabs properties.
1769 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001770 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001771 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001772 if self.command:
1773 self.command[0] = self.command[0].replace('/', os.path.sep)
1774 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001775 if self.read_only is None and node.data.get('read_only') is not None:
1776 self.read_only = node.data['read_only']
1777 if (self.relative_cwd is None and
1778 node.data.get('relative_cwd') is not None):
1779 self.relative_cwd = node.data['relative_cwd']
1780
1781
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001782def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001783 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001784 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001785
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001786 Arguments:
1787 isolated_hash: hash of the root *.isolated file.
1788 storage: Storage class that communicates with isolate storage.
1789 cache: LocalCache class that knows how to store and map files locally.
1790 algo: hash algorithm to use.
1791 outdir: Output directory to map file tree to.
1792 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1793 require_command: Ensure *.isolated specifies a command to run.
1794
1795 Returns:
1796 Settings object that holds details about loaded *.isolated file.
1797 """
1798 with cache:
1799 fetch_queue = FetchQueue(storage, cache)
1800 settings = Settings()
1801
1802 with tools.Profiler('GetIsolateds'):
1803 # Optionally support local files by manually adding them to cache.
1804 if not is_valid_hash(isolated_hash, algo):
1805 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1806
1807 # Load all *.isolated and start loading rest of the files.
1808 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001809 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001810 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1811 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001812 raise ConfigError('No command to run')
1813
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001814 with tools.Profiler('GetRest'):
1815 # Create file system hierarchy.
1816 if not os.path.isdir(outdir):
1817 os.makedirs(outdir)
1818 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001819 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001820
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001821 # Ensure working directory exists.
1822 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1823 if not os.path.isdir(cwd):
1824 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001825
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001826 # Multimap: digest -> list of pairs (path, props).
1827 remaining = {}
1828 for filepath, props in settings.files.iteritems():
1829 if 'h' in props:
1830 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001831
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001832 # Now block on the remaining files to be downloaded and mapped.
1833 logging.info('Retrieving remaining files (%d of them)...',
1834 fetch_queue.pending_count)
1835 last_update = time.time()
1836 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1837 while remaining:
1838 detector.ping()
1839
1840 # Wait for any item to finish fetching to cache.
1841 digest = fetch_queue.wait(remaining)
1842
1843 # Link corresponding files to a fetched item in cache.
1844 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001845 cache.hardlink(
1846 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001847
1848 # Report progress.
1849 duration = time.time() - last_update
1850 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1851 msg = '%d files remaining...' % len(remaining)
1852 print msg
1853 logging.info(msg)
1854 last_update = time.time()
1855
1856 # Cache could evict some items we just tried to fetch, it's a fatal error.
1857 if not fetch_queue.verify_all_cached():
1858 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001859 return settings
1860
1861
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001862def directory_to_metadata(root, algo, blacklist):
1863 """Returns the FileItem list and .isolated metadata for a directory."""
1864 root = file_path.get_native_path_case(root)
1865 metadata = dict(
1866 (relpath, process_input(
1867 os.path.join(root, relpath), {}, False, sys.platform, algo))
1868 for relpath in expand_directory_and_symlink(
1869 root, './', blacklist, True)
1870 )
1871 for v in metadata.itervalues():
1872 v.pop('t')
1873 items = [
1874 FileItem(
1875 path=os.path.join(root, relpath),
1876 digest=meta['h'],
1877 size=meta['s'],
1878 is_isolated=relpath.endswith('.isolated'))
1879 for relpath, meta in metadata.iteritems() if 'h' in meta
1880 ]
1881 return items, metadata
1882
1883
1884def archive(storage, algo, files, blacklist):
1885 """Stores every entries and returns the relevant data."""
1886 assert all(isinstance(i, unicode) for i in files), files
1887 if len(files) != len(set(map(os.path.abspath, files))):
1888 raise Error('Duplicate entries found.')
1889
1890 results = []
1891 # The temporary directory is only created as needed.
1892 tempdir = None
1893 try:
1894 # TODO(maruel): Yield the files to a worker thread.
1895 items_to_upload = []
1896 for f in files:
1897 try:
1898 filepath = os.path.abspath(f)
1899 if os.path.isdir(filepath):
1900 # Uploading a whole directory.
1901 items, metadata = directory_to_metadata(filepath, algo, blacklist)
1902
1903 # Create the .isolated file.
1904 if not tempdir:
1905 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1906 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1907 os.close(handle)
1908 data = {
1909 'algo': SUPPORTED_ALGOS_REVERSE[algo],
1910 'files': metadata,
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001911 'version': ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001912 }
1913 save_isolated(isolated, data)
1914 h = hash_file(isolated, algo)
1915 items_to_upload.extend(items)
1916 items_to_upload.append(
1917 FileItem(
1918 path=isolated,
1919 digest=h,
1920 size=os.stat(isolated).st_size,
1921 is_isolated=True))
1922 results.append((h, f))
1923
1924 elif os.path.isfile(filepath):
1925 h = hash_file(filepath, algo)
1926 items_to_upload.append(
1927 FileItem(
1928 path=filepath,
1929 digest=h,
1930 size=os.stat(filepath).st_size,
1931 is_isolated=f.endswith('.isolated')))
1932 results.append((h, f))
1933 else:
1934 raise Error('%s is neither a file or directory.' % f)
1935 except OSError:
1936 raise Error('Failed to process %s.' % f)
1937 # Technically we would care about the uploaded files but we don't much in
1938 # practice.
1939 _uploaded_files = storage.upload_items(items_to_upload)
1940 return results
1941 finally:
1942 if tempdir:
1943 shutil.rmtree(tempdir)
1944
1945
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001946@subcommand.usage('<file1..fileN> or - to read from stdin')
1947def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001948 """Archives data to the server.
1949
1950 If a directory is specified, a .isolated file is created the whole directory
1951 is uploaded. Then this .isolated file can be included in another one to run
1952 commands.
1953
1954 The commands output each file that was processed with its content hash. For
1955 directories, the .isolated generated for the directory is listed as the
1956 directory entry itself.
1957 """
1958 parser.add_option(
1959 '--blacklist',
1960 action='append', default=list(DEFAULT_BLACKLIST),
1961 help='List of regexp to use as blacklist filter when uploading '
1962 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001963 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001964
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001965 if files == ['-']:
1966 files = sys.stdin.readlines()
1967
1968 if not files:
1969 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001970
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001971 files = [f.decode('utf-8') for f in files]
1972 algo = get_hash_algo(options.namespace)
1973 blacklist = tools.gen_blacklist(options.blacklist)
1974 try:
1975 with get_storage(options.isolate_server, options.namespace) as storage:
1976 results = archive(storage, algo, files, blacklist)
1977 except Error as e:
1978 parser.error(e.args[0])
1979 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1980 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001981
1982
1983def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001984 """Download data from the server.
1985
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001986 It can either download individual files or a complete tree from a .isolated
1987 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001988 """
1989 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001990 '-i', '--isolated', metavar='HASH',
1991 help='hash of an isolated file, .isolated file content is discarded, use '
1992 '--file if you need it')
1993 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001994 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1995 help='hash and destination of a file, can be used multiple times')
1996 parser.add_option(
1997 '-t', '--target', metavar='DIR', default=os.getcwd(),
1998 help='destination directory')
1999 options, args = parser.parse_args(args)
2000 if args:
2001 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002002 if bool(options.isolated) == bool(options.file):
2003 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002004
2005 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002006
Vadim Shtayura3172be52013-12-03 12:49:05 -08002007 with get_storage(options.isolate_server, options.namespace) as storage:
2008 # Fetching individual files.
2009 if options.file:
2010 channel = threading_utils.TaskChannel()
2011 pending = {}
2012 for digest, dest in options.file:
2013 pending[digest] = dest
2014 storage.async_fetch(
2015 channel,
2016 WorkerPool.MED,
2017 digest,
2018 UNKNOWN_FILE_SIZE,
2019 functools.partial(file_write, os.path.join(options.target, dest)))
2020 while pending:
2021 fetched = channel.pull()
2022 dest = pending.pop(fetched)
2023 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002024
Vadim Shtayura3172be52013-12-03 12:49:05 -08002025 # Fetching whole isolated tree.
2026 if options.isolated:
2027 settings = fetch_isolated(
2028 isolated_hash=options.isolated,
2029 storage=storage,
2030 cache=MemoryCache(),
2031 algo=get_hash_algo(options.namespace),
2032 outdir=options.target,
2033 os_flavor=None,
2034 require_command=False)
2035 rel = os.path.join(options.target, settings.relative_cwd)
2036 print('To run this test please run from the directory %s:' %
2037 os.path.join(options.target, rel))
2038 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002039
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002040 return 0
2041
2042
2043class OptionParserIsolateServer(tools.OptionParserWithLogging):
2044 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002045 tools.OptionParserWithLogging.__init__(
2046 self,
2047 version=__version__,
2048 prog=os.path.basename(sys.modules[__name__].__file__),
2049 **kwargs)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002050 self.add_option(
2051 '-I', '--isolate-server',
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00002052 metavar='URL', default='',
2053 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002054 self.add_option(
2055 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002056 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002057
2058 def parse_args(self, *args, **kwargs):
2059 options, args = tools.OptionParserWithLogging.parse_args(
2060 self, *args, **kwargs)
2061 options.isolate_server = options.isolate_server.rstrip('/')
2062 if not options.isolate_server:
2063 self.error('--isolate-server is required.')
2064 return options, args
2065
2066
2067def main(args):
2068 dispatcher = subcommand.CommandDispatcher(__name__)
2069 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002070 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00002071 except Exception as e:
2072 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002073 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002074
2075
2076if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002077 fix_encoding.fix_encoding()
2078 tools.disable_buffering()
2079 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002080 sys.exit(main(sys.argv[1:]))