blob: 29d0942544b90169dd7e6bfb6dd66893c324b4d8 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05006"""Archives a set of files or directories to a server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05008__version__ = '0.3.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import shutil
17import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050019import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
36
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000038# Version of isolate protocol passed to the server in /handshake request.
39ISOLATE_PROTOCOL_VERSION = '1.0'
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -050040# Version stored and expected in .isolated files.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -050041ISOLATED_FILE_VERSION = '1.3'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043
44# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000045# All files are sorted by likelihood of a change in the file content
46# (currently file size is used to estimate this: larger the file -> larger the
47# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000049# and so on. Numbers here is a trade-off; the more per request, the lower the
50# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
51# larger values cause longer lookups, increasing the initial latency to start
52# uploading, which is especially an issue for large files. This value is
53# optimized for the "few thousands files to look up with minimal number of large
54# files missing" case.
55ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000056
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000057
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000058# A list of already compressed extension types that should not receive any
59# compression before being uploaded.
60ALREADY_COMPRESSED_TYPES = [
61 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
62 'wav', 'zip'
63]
64
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000065
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000066# The file size to be used when we don't know the correct file size,
67# generally used for .isolated files.
68UNKNOWN_FILE_SIZE = None
69
70
71# The size of each chunk to read when downloading and unzipping files.
72ZIPPED_FILE_CHUNK = 16 * 1024
73
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000074# Chunk size to use when doing disk I/O.
75DISK_FILE_CHUNK = 1024 * 1024
76
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000077# Chunk size to use when reading from network stream.
78NET_IO_FILE_CHUNK = 16 * 1024
79
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000080
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081# Read timeout in seconds for downloads from isolate storage. If there's no
82# response from the server within this timeout whole download will be aborted.
83DOWNLOAD_READ_TIMEOUT = 60
84
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000085# Maximum expected delay (in seconds) between successive file fetches
86# in run_tha_test. If it takes longer than that, a deadlock might be happening
87# and all stack frames for all threads are dumped to log.
88DEADLOCK_TIMEOUT = 5 * 60
89
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000090
maruel@chromium.org41601642013-09-18 19:40:46 +000091# The delay (in seconds) to wait between logging statements when retrieving
92# the required files. This is intended to let the user (or buildbot) know that
93# the program is still running.
94DELAY_BETWEEN_UPDATES_IN_SECS = 30
95
96
maruel@chromium.org385d73d2013-09-19 18:33:21 +000097# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
98# specify the names here.
99SUPPORTED_ALGOS = {
100 'md5': hashlib.md5,
101 'sha-1': hashlib.sha1,
102 'sha-512': hashlib.sha512,
103}
104
105
106# Used for serialization.
107SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
108
109
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500110DEFAULT_BLACKLIST = (
111 # Temporary vim or python files.
112 r'^.+\.(?:pyc|swp)$',
113 # .git or .svn directory.
114 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
115)
116
117
118# Chromium-specific.
119DEFAULT_BLACKLIST += (
120 r'^.+\.(?:run_test_cases)$',
121 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
122)
123
124
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500125class Error(Exception):
126 """Generic runtime error."""
127 pass
128
129
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000130class ConfigError(ValueError):
131 """Generic failure to load a .isolated file."""
132 pass
133
134
135class MappingError(OSError):
136 """Failed to recreate the tree."""
137 pass
138
139
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000140def is_valid_hash(value, algo):
141 """Returns if the value is a valid hash for the corresponding algorithm."""
142 size = 2 * algo().digest_size
143 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
144
145
146def hash_file(filepath, algo):
147 """Calculates the hash of a file without reading it all in memory at once.
148
149 |algo| should be one of hashlib hashing algorithm.
150 """
151 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000152 with open(filepath, 'rb') as f:
153 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000155 if not chunk:
156 break
157 digest.update(chunk)
158 return digest.hexdigest()
159
160
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000161def stream_read(stream, chunk_size):
162 """Reads chunks from |stream| and yields them."""
163 while True:
164 data = stream.read(chunk_size)
165 if not data:
166 break
167 yield data
168
169
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800170def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
171 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000172 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800173 if offset:
174 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000175 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000176 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000177 if not data:
178 break
179 yield data
180
181
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000182def file_write(filepath, content_generator):
183 """Writes file content as generated by content_generator.
184
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000185 Creates the intermediary directory as needed.
186
187 Returns the number of bytes written.
188
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000189 Meant to be mocked out in unit tests.
190 """
191 filedir = os.path.dirname(filepath)
192 if not os.path.isdir(filedir):
193 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000194 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000195 with open(filepath, 'wb') as f:
196 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000197 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000198 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000199 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000200
201
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000202def zip_compress(content_generator, level=7):
203 """Reads chunks from |content_generator| and yields zip compressed chunks."""
204 compressor = zlib.compressobj(level)
205 for chunk in content_generator:
206 compressed = compressor.compress(chunk)
207 if compressed:
208 yield compressed
209 tail = compressor.flush(zlib.Z_FINISH)
210 if tail:
211 yield tail
212
213
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000214def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
215 """Reads zipped data from |content_generator| and yields decompressed data.
216
217 Decompresses data in small chunks (no larger than |chunk_size|) so that
218 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
219
220 Raises IOError if data is corrupted or incomplete.
221 """
222 decompressor = zlib.decompressobj()
223 compressed_size = 0
224 try:
225 for chunk in content_generator:
226 compressed_size += len(chunk)
227 data = decompressor.decompress(chunk, chunk_size)
228 if data:
229 yield data
230 while decompressor.unconsumed_tail:
231 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
232 if data:
233 yield data
234 tail = decompressor.flush()
235 if tail:
236 yield tail
237 except zlib.error as e:
238 raise IOError(
239 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
240 # Ensure all data was read and decompressed.
241 if decompressor.unused_data or decompressor.unconsumed_tail:
242 raise IOError('Not all data was decompressed')
243
244
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000245def get_zip_compression_level(filename):
246 """Given a filename calculates the ideal zip compression level to use."""
247 file_ext = os.path.splitext(filename)[1].lower()
248 # TODO(csharp): Profile to find what compression level works best.
249 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
250
251
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000252def create_directories(base_directory, files):
253 """Creates the directory structure needed by the given list of files."""
254 logging.debug('create_directories(%s, %d)', base_directory, len(files))
255 # Creates the tree of directories to create.
256 directories = set(os.path.dirname(f) for f in files)
257 for item in list(directories):
258 while item:
259 directories.add(item)
260 item = os.path.dirname(item)
261 for d in sorted(directories):
262 if d:
263 os.mkdir(os.path.join(base_directory, d))
264
265
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500266def create_symlinks(base_directory, files):
267 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000268 for filepath, properties in files:
269 if 'l' not in properties:
270 continue
271 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500272 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000273 logging.warning('Ignoring symlink %s', filepath)
274 continue
275 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500276 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000277 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000278
279
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000280def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000281 """Determines if the given files appears valid.
282
283 Currently it just checks the file's size.
284 """
285 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000286 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000287 actual_size = os.stat(filepath).st_size
288 if size != actual_size:
289 logging.warning(
290 'Found invalid item %s; %d != %d',
291 os.path.basename(filepath), actual_size, size)
292 return False
293 return True
294
295
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000296class WorkerPool(threading_utils.AutoRetryThreadPool):
297 """Thread pool that automatically retries on IOError and runs a preconfigured
298 function.
299 """
300 # Initial and maximum number of worker threads.
301 INITIAL_WORKERS = 2
302 MAX_WORKERS = 16
303 RETRIES = 5
304
305 def __init__(self):
306 super(WorkerPool, self).__init__(
307 [IOError],
308 self.RETRIES,
309 self.INITIAL_WORKERS,
310 self.MAX_WORKERS,
311 0,
312 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000313
314
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000315class Item(object):
316 """An item to push to Storage.
317
318 It starts its life in a main thread, travels to 'contains' thread, then to
319 'push' thread and then finally back to the main thread.
320
321 It is never used concurrently from multiple threads.
322 """
323
324 def __init__(self, digest, size, is_isolated=False):
325 self.digest = digest
326 self.size = size
327 self.is_isolated = is_isolated
328 self.compression_level = 6
329 self.push_state = None
330
331 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000332 """Iterable with content of this item in chunks of given size.
333
334 Arguments:
335 chunk_size: preferred size of the chunk to produce, may be ignored.
336 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000337 raise NotImplementedError()
338
339
340class FileItem(Item):
341 """A file to push to Storage."""
342
343 def __init__(self, path, digest, size, is_isolated):
344 super(FileItem, self).__init__(digest, size, is_isolated)
345 self.path = path
346 self.compression_level = get_zip_compression_level(path)
347
348 def content(self, chunk_size):
349 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000350
351
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000352class BufferItem(Item):
353 """A byte buffer to push to Storage."""
354
355 def __init__(self, buf, algo, is_isolated=False):
356 super(BufferItem, self).__init__(
357 algo(buf).hexdigest(), len(buf), is_isolated)
358 self.buffer = buf
359
360 def content(self, _chunk_size):
361 return [self.buffer]
362
363
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000364class Storage(object):
365 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000366
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 def __init__(self, storage_api, use_zip):
368 self.use_zip = use_zip
369 self._storage_api = storage_api
370 self._cpu_thread_pool = None
371 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000372
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000373 @property
374 def cpu_thread_pool(self):
375 """ThreadPool for CPU-bound tasks like zipping."""
376 if self._cpu_thread_pool is None:
377 self._cpu_thread_pool = threading_utils.ThreadPool(
378 2, max(threading_utils.num_processors(), 2), 0, 'zip')
379 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000380
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000381 @property
382 def net_thread_pool(self):
383 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
384 if self._net_thread_pool is None:
385 self._net_thread_pool = WorkerPool()
386 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000387
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 def close(self):
389 """Waits for all pending tasks to finish."""
390 if self._cpu_thread_pool:
391 self._cpu_thread_pool.join()
392 self._cpu_thread_pool.close()
393 self._cpu_thread_pool = None
394 if self._net_thread_pool:
395 self._net_thread_pool.join()
396 self._net_thread_pool.close()
397 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000398
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000399 def __enter__(self):
400 """Context manager interface."""
401 return self
402
403 def __exit__(self, _exc_type, _exc_value, _traceback):
404 """Context manager interface."""
405 self.close()
406 return False
407
408 def upload_tree(self, indir, infiles):
409 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000410
411 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 indir: root directory the infiles are based in.
413 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000414
415 Returns:
416 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000417 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000418 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
419
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000420 # Convert |indir| + |infiles| into a list of FileItem objects.
421 # Filter out symlinks, since they are not represented by items on isolate
422 # server side.
423 items = [
424 FileItem(
425 path=os.path.join(indir, filepath),
426 digest=metadata['h'],
427 size=metadata['s'],
428 is_isolated=metadata.get('priority') == '0')
429 for filepath, metadata in infiles.iteritems()
430 if 'l' not in metadata
431 ]
432
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000433 return self.upload_items(items)
434
435 def upload_items(self, items):
436 """Uploads bunch of items to the isolate server.
437
438 Will upload only items that are missing.
439
440 Arguments:
441 items: list of Item instances that represents data to upload.
442
443 Returns:
444 List of items that were uploaded. All other items are already there.
445 """
446 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
447 # used by swarming.py. There's no need to spawn multiple threads and try to
448 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
449 # 'push' should be performed sequentially in the context of current thread.
450
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000451 # For each digest keep only first Item that matches it. All other items
452 # are just indistinguishable copies from the point of view of isolate
453 # server (it doesn't care about paths at all, only content and digests).
454 seen = {}
455 duplicates = 0
456 for item in items:
457 if seen.setdefault(item.digest, item) is not item:
458 duplicates += 1
459 items = seen.values()
460 if duplicates:
461 logging.info('Skipped %d duplicated files', duplicates)
462
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000464 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000466 for missing_item in self.get_missing_items(items):
467 missing.add(missing_item)
468 self.async_push(
469 channel,
470 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
471 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000473 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 # No need to spawn deadlock detector thread if there's nothing to upload.
475 if missing:
476 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
477 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000478 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000479 detector.ping()
480 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000481 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000482 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000483 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000484 logging.info('All files are uploaded')
485
486 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000487 total = len(items)
488 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 logging.info(
490 'Total: %6d, %9.1fkb',
491 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000492 total_size / 1024.)
493 cache_hit = set(items) - missing
494 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 logging.info(
496 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
497 len(cache_hit),
498 cache_hit_size / 1024.,
499 len(cache_hit) * 100. / total,
500 cache_hit_size * 100. / total_size if total_size else 0)
501 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000502 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000503 logging.info(
504 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
505 len(cache_miss),
506 cache_miss_size / 1024.,
507 len(cache_miss) * 100. / total,
508 cache_miss_size * 100. / total_size if total_size else 0)
509
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000510 return uploaded
511
512 def get_fetch_url(self, digest):
513 """Returns an URL that can be used to fetch an item with given digest.
514
515 Arguments:
516 digest: hex digest of item to fetch.
517
518 Returns:
519 An URL or None if underlying protocol doesn't support this.
520 """
521 return self._storage_api.get_fetch_url(digest)
522
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000523 def async_push(self, channel, priority, item):
524 """Starts asynchronous push to the server in a parallel thread.
525
526 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000527 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 priority: thread pool task priority for the push.
529 item: item to upload as instance of Item class.
530 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000531 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000533 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 return item
535
536 # If zipping is not required, just start a push task.
537 if not self.use_zip:
538 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000539 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 return
541
542 # If zipping is enabled, zip in a separate thread.
543 def zip_and_push():
544 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
545 # content right here. It will block until all file is zipped.
546 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
548 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 data = ''.join(stream)
550 except Exception as exc:
551 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800552 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000553 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000554 self.net_thread_pool.add_task_with_channel(
555 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000556 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000557
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000558 def async_fetch(self, channel, priority, digest, size, sink):
559 """Starts asynchronous fetch from the server in a parallel thread.
560
561 Arguments:
562 channel: TaskChannel that receives back |digest| when download ends.
563 priority: thread pool task priority for the fetch.
564 digest: hex digest of an item to download.
565 size: expected size of the item (after decompression).
566 sink: function that will be called as sink(generator).
567 """
568 def fetch():
569 try:
570 # Prepare reading pipeline.
571 stream = self._storage_api.fetch(digest)
572 if self.use_zip:
573 stream = zip_decompress(stream, DISK_FILE_CHUNK)
574 # Run |stream| through verifier that will assert its size.
575 verifier = FetchStreamVerifier(stream, size)
576 # Verified stream goes to |sink|.
577 sink(verifier.run())
578 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800579 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000580 raise
581 return digest
582
583 # Don't bother with zip_thread_pool for decompression. Decompression is
584 # really fast and most probably IO bound anyway.
585 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
586
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 def get_missing_items(self, items):
588 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000589
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000590 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000591
592 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000593 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594
595 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000596 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000597 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 channel = threading_utils.TaskChannel()
599 pending = 0
600 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000601 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000602 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
603 self._storage_api.contains, batch)
604 pending += 1
605 # Yield results as they come in.
606 for _ in xrange(pending):
607 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000608 yield missing
609
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000610 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000611 def batch_items_for_check(items):
612 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613
614 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
617 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000618 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000619
620 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000621 Batches of items to query for existence in a single operation,
622 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623 """
624 batch_count = 0
625 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
626 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000627 for item in sorted(items, key=lambda x: x.size, reverse=True):
628 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000629 if len(next_queries) == batch_size_limit:
630 yield next_queries
631 next_queries = []
632 batch_count += 1
633 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
634 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
635 if next_queries:
636 yield next_queries
637
638
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000639class FetchQueue(object):
640 """Fetches items from Storage and places them into LocalCache.
641
642 It manages multiple concurrent fetch operations. Acts as a bridge between
643 Storage and LocalCache so that Storage and LocalCache don't depend on each
644 other at all.
645 """
646
647 def __init__(self, storage, cache):
648 self.storage = storage
649 self.cache = cache
650 self._channel = threading_utils.TaskChannel()
651 self._pending = set()
652 self._accessed = set()
653 self._fetched = cache.cached_set()
654
655 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
656 """Starts asynchronous fetch of item |digest|."""
657 # Fetching it now?
658 if digest in self._pending:
659 return
660
661 # Mark this file as in use, verify_all_cached will later ensure it is still
662 # in cache.
663 self._accessed.add(digest)
664
665 # Already fetched? Notify cache to update item's LRU position.
666 if digest in self._fetched:
667 # 'touch' returns True if item is in cache and not corrupted.
668 if self.cache.touch(digest, size):
669 return
670 # Item is corrupted, remove it from cache and fetch it again.
671 self._fetched.remove(digest)
672 self.cache.evict(digest)
673
674 # TODO(maruel): It should look at the free disk space, the current cache
675 # size and the size of the new item on every new item:
676 # - Trim the cache as more entries are listed when free disk space is low,
677 # otherwise if the amount of data downloaded during the run > free disk
678 # space, it'll crash.
679 # - Make sure there's enough free disk space to fit all dependencies of
680 # this run! If not, abort early.
681
682 # Start fetching.
683 self._pending.add(digest)
684 self.storage.async_fetch(
685 self._channel, priority, digest, size,
686 functools.partial(self.cache.write, digest))
687
688 def wait(self, digests):
689 """Starts a loop that waits for at least one of |digests| to be retrieved.
690
691 Returns the first digest retrieved.
692 """
693 # Flush any already fetched items.
694 for digest in digests:
695 if digest in self._fetched:
696 return digest
697
698 # Ensure all requested items are being fetched now.
699 assert all(digest in self._pending for digest in digests), (
700 digests, self._pending)
701
702 # Wait for some requested item to finish fetching.
703 while self._pending:
704 digest = self._channel.pull()
705 self._pending.remove(digest)
706 self._fetched.add(digest)
707 if digest in digests:
708 return digest
709
710 # Should never reach this point due to assert above.
711 raise RuntimeError('Impossible state')
712
713 def inject_local_file(self, path, algo):
714 """Adds local file to the cache as if it was fetched from storage."""
715 with open(path, 'rb') as f:
716 data = f.read()
717 digest = algo(data).hexdigest()
718 self.cache.write(digest, [data])
719 self._fetched.add(digest)
720 return digest
721
722 @property
723 def pending_count(self):
724 """Returns number of items to be fetched."""
725 return len(self._pending)
726
727 def verify_all_cached(self):
728 """True if all accessed items are in cache."""
729 return self._accessed.issubset(self.cache.cached_set())
730
731
732class FetchStreamVerifier(object):
733 """Verifies that fetched file is valid before passing it to the LocalCache."""
734
735 def __init__(self, stream, expected_size):
736 self.stream = stream
737 self.expected_size = expected_size
738 self.current_size = 0
739
740 def run(self):
741 """Generator that yields same items as |stream|.
742
743 Verifies |stream| is complete before yielding a last chunk to consumer.
744
745 Also wraps IOError produced by consumer into MappingError exceptions since
746 otherwise Storage will retry fetch on unrelated local cache errors.
747 """
748 # Read one chunk ahead, keep it in |stored|.
749 # That way a complete stream can be verified before pushing last chunk
750 # to consumer.
751 stored = None
752 for chunk in self.stream:
753 assert chunk is not None
754 if stored is not None:
755 self._inspect_chunk(stored, is_last=False)
756 try:
757 yield stored
758 except IOError as exc:
759 raise MappingError('Failed to store an item in cache: %s' % exc)
760 stored = chunk
761 if stored is not None:
762 self._inspect_chunk(stored, is_last=True)
763 try:
764 yield stored
765 except IOError as exc:
766 raise MappingError('Failed to store an item in cache: %s' % exc)
767
768 def _inspect_chunk(self, chunk, is_last):
769 """Called for each fetched chunk before passing it to consumer."""
770 self.current_size += len(chunk)
771 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
772 (self.expected_size != self.current_size)):
773 raise IOError('Incorrect file size: expected %d, got %d' % (
774 self.expected_size, self.current_size))
775
776
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000777class StorageApi(object):
778 """Interface for classes that implement low-level storage operations."""
779
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000780 def get_fetch_url(self, digest):
781 """Returns an URL that can be used to fetch an item with given digest.
782
783 Arguments:
784 digest: hex digest of item to fetch.
785
786 Returns:
787 An URL or None if the protocol doesn't support this.
788 """
789 raise NotImplementedError()
790
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800791 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000792 """Fetches an object and yields its content.
793
794 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000795 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800796 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000797
798 Yields:
799 Chunks of downloaded item (as str objects).
800 """
801 raise NotImplementedError()
802
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000803 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000804 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000805
806 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000807 item: Item object that holds information about an item being pushed.
808 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000809
810 Returns:
811 None.
812 """
813 raise NotImplementedError()
814
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 def contains(self, items):
816 """Checks for existence of given |items| on the server.
817
818 Mutates |items| by assigning opaque implement specific object to Item's
819 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000820
821 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000822 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000823
824 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000825 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000826 """
827 raise NotImplementedError()
828
829
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000830class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000831 """StorageApi implementation that downloads and uploads to Isolate Server.
832
833 It uploads and downloads directly from Google Storage whenever appropriate.
834 """
835
836 class _PushState(object):
837 """State needed to call .push(), to be stored in Item.push_state."""
838 def __init__(self, upload_url, finalize_url):
839 self.upload_url = upload_url
840 self.finalize_url = finalize_url
841 self.uploaded = False
842 self.finalized = False
843
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000844 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000845 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000846 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000847 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000848 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000849 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000850 self._server_caps = None
851
852 @staticmethod
853 def _generate_handshake_request():
854 """Returns a dict to be sent as handshake request body."""
855 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
856 return {
857 'client_app_version': __version__,
858 'fetcher': True,
859 'protocol_version': ISOLATE_PROTOCOL_VERSION,
860 'pusher': True,
861 }
862
863 @staticmethod
864 def _validate_handshake_response(caps):
865 """Validates and normalizes handshake response."""
866 logging.info('Protocol version: %s', caps['protocol_version'])
867 logging.info('Server version: %s', caps['server_app_version'])
868 if caps.get('error'):
869 raise MappingError(caps['error'])
870 if not caps['access_token']:
871 raise ValueError('access_token is missing')
872 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000873
874 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000875 def _server_capabilities(self):
876 """Performs handshake with the server if not yet done.
877
878 Returns:
879 Server capabilities dictionary as returned by /handshake endpoint.
880
881 Raises:
882 MappingError if server rejects the handshake.
883 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000884 # TODO(maruel): Make this request much earlier asynchronously while the
885 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000886 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000887 if self._server_caps is None:
888 request_body = json.dumps(
889 self._generate_handshake_request(), separators=(',', ':'))
890 response = net.url_read(
891 url=self.base_url + '/content-gs/handshake',
892 data=request_body,
893 content_type='application/json',
894 method='POST')
895 if response is None:
896 raise MappingError('Failed to perform handshake.')
897 try:
898 caps = json.loads(response)
899 if not isinstance(caps, dict):
900 raise ValueError('Expecting JSON dict')
901 self._server_caps = self._validate_handshake_response(caps)
902 except (ValueError, KeyError, TypeError) as exc:
903 # KeyError exception has very confusing str conversion: it's just a
904 # missing key value and nothing else. So print exception class name
905 # as well.
906 raise MappingError('Invalid handshake response (%s): %s' % (
907 exc.__class__.__name__, exc))
908 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000909
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000910 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000911 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000912 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000913 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000914
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800915 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000916 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800917 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000918
919 # Because the app engine DB is only eventually consistent, retry 404 errors
920 # because the file might just not be visible yet (even though it has been
921 # uploaded).
922 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800923 source_url,
924 retry_404=True,
925 read_timeout=DOWNLOAD_READ_TIMEOUT,
926 headers={'Range': 'bytes=%d-' % offset} if offset else None)
927
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000928 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800929 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800930
931 # If |offset| is used, verify server respects it by checking Content-Range.
932 if offset:
933 content_range = connection.get_header('Content-Range')
934 if not content_range:
935 raise IOError('Missing Content-Range header')
936
937 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
938 # According to a spec, <size> can be '*' meaning "Total size of the file
939 # is not known in advance".
940 try:
941 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
942 if not match:
943 raise ValueError()
944 content_offset = int(match.group(1))
945 last_byte_index = int(match.group(2))
946 size = None if match.group(3) == '*' else int(match.group(3))
947 except ValueError:
948 raise IOError('Invalid Content-Range header: %s' % content_range)
949
950 # Ensure returned offset equals requested one.
951 if offset != content_offset:
952 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
953 offset, content_offset, content_range))
954
955 # Ensure entire tail of the file is returned.
956 if size is not None and last_byte_index + 1 != size:
957 raise IOError('Incomplete response. Content-Range: %s' % content_range)
958
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000959 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000960
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000961 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000962 assert isinstance(item, Item)
963 assert isinstance(item.push_state, IsolateServer._PushState)
964 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000965
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000966 # TODO(vadimsh): Do not read from |content| generator when retrying push.
967 # If |content| is indeed a generator, it can not be re-winded back
968 # to the beginning of the stream. A retry will find it exhausted. A possible
969 # solution is to wrap |content| generator with some sort of caching
970 # restartable generator. It should be done alongside streaming support
971 # implementation.
972
973 # This push operation may be a retry after failed finalization call below,
974 # no need to reupload contents in that case.
975 if not item.push_state.uploaded:
976 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
977 # upload support is implemented.
978 if isinstance(content, list) and len(content) == 1:
979 content = content[0]
980 else:
981 content = ''.join(content)
982 # PUT file to |upload_url|.
983 response = net.url_read(
984 url=item.push_state.upload_url,
985 data=content,
986 content_type='application/octet-stream',
987 method='PUT')
988 if response is None:
989 raise IOError('Failed to upload a file %s to %s' % (
990 item.digest, item.push_state.upload_url))
991 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000992 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000993 logging.info(
994 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000995
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000996 # Optionally notify the server that it's done.
997 if item.push_state.finalize_url:
998 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
999 # send it to isolated server. That way isolate server can verify that
1000 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1001 # stored files).
1002 response = net.url_read(
1003 url=item.push_state.finalize_url,
1004 data='',
1005 content_type='application/json',
1006 method='POST')
1007 if response is None:
1008 raise IOError('Failed to finalize an upload of %s' % item.digest)
1009 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001010
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001011 def contains(self, items):
1012 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001013
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001014 # Request body is a json encoded list of dicts.
1015 body = [
1016 {
1017 'h': item.digest,
1018 's': item.size,
1019 'i': int(item.is_isolated),
1020 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001021 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001022
1023 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1024 self.base_url,
1025 self.namespace,
1026 urllib.quote(self._server_capabilities['access_token']))
1027 response_body = net.url_read(
1028 url=query_url,
1029 data=json.dumps(body, separators=(',', ':')),
1030 content_type='application/json',
1031 method='POST')
1032 if response_body is None:
1033 raise MappingError('Failed to execute /pre-upload query')
1034
1035 # Response body is a list of push_urls (or null if file is already present).
1036 try:
1037 response = json.loads(response_body)
1038 if not isinstance(response, list):
1039 raise ValueError('Expecting response with json-encoded list')
1040 if len(response) != len(items):
1041 raise ValueError(
1042 'Incorrect number of items in the list, expected %d, '
1043 'but got %d' % (len(items), len(response)))
1044 except ValueError as err:
1045 raise MappingError(
1046 'Invalid response from server: %s, body is %s' % (err, response_body))
1047
1048 # Pick Items that are missing, attach _PushState to them.
1049 missing_items = []
1050 for i, push_urls in enumerate(response):
1051 if push_urls:
1052 assert len(push_urls) == 2, str(push_urls)
1053 item = items[i]
1054 assert item.push_state is None
1055 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
1056 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001057 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001058 len(items), len(items) - len(missing_items))
1059 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001060
1061
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001062class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001063 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001064
1065 The common use case is a NFS/CIFS file server that is mounted locally that is
1066 used to fetch the file on a local partition.
1067 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001068
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001069 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001070 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001071 self.base_path = base_path
1072
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001073 def get_fetch_url(self, digest):
1074 return None
1075
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001076 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001077 assert isinstance(digest, basestring)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001078 return file_read(os.path.join(self.base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001079
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001080 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001081 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001082 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001083
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001084 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001085 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001086 item for item in items
1087 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001088 ]
1089
1090
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001091class LocalCache(object):
1092 """Local cache that stores objects fetched via Storage.
1093
1094 It can be accessed concurrently from multiple threads, so it should protect
1095 its internal state with some lock.
1096 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001097 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001098
1099 def __enter__(self):
1100 """Context manager interface."""
1101 return self
1102
1103 def __exit__(self, _exc_type, _exec_value, _traceback):
1104 """Context manager interface."""
1105 return False
1106
1107 def cached_set(self):
1108 """Returns a set of all cached digests (always a new object)."""
1109 raise NotImplementedError()
1110
1111 def touch(self, digest, size):
1112 """Ensures item is not corrupted and updates its LRU position.
1113
1114 Arguments:
1115 digest: hash digest of item to check.
1116 size: expected size of this item.
1117
1118 Returns:
1119 True if item is in cache and not corrupted.
1120 """
1121 raise NotImplementedError()
1122
1123 def evict(self, digest):
1124 """Removes item from cache if it's there."""
1125 raise NotImplementedError()
1126
1127 def read(self, digest):
1128 """Returns contents of the cached item as a single str."""
1129 raise NotImplementedError()
1130
1131 def write(self, digest, content):
1132 """Reads data from |content| generator and stores it in cache."""
1133 raise NotImplementedError()
1134
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001135 def hardlink(self, digest, dest, file_mode):
1136 """Ensures file at |dest| has same content as cached |digest|.
1137
1138 If file_mode is provided, it is used to set the executable bit if
1139 applicable.
1140 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001141 raise NotImplementedError()
1142
1143
1144class MemoryCache(LocalCache):
1145 """LocalCache implementation that stores everything in memory."""
1146
1147 def __init__(self):
1148 super(MemoryCache, self).__init__()
1149 # Let's not assume dict is thread safe.
1150 self._lock = threading.Lock()
1151 self._contents = {}
1152
1153 def cached_set(self):
1154 with self._lock:
1155 return set(self._contents)
1156
1157 def touch(self, digest, size):
1158 with self._lock:
1159 return digest in self._contents
1160
1161 def evict(self, digest):
1162 with self._lock:
1163 self._contents.pop(digest, None)
1164
1165 def read(self, digest):
1166 with self._lock:
1167 return self._contents[digest]
1168
1169 def write(self, digest, content):
1170 # Assemble whole stream before taking the lock.
1171 data = ''.join(content)
1172 with self._lock:
1173 self._contents[digest] = data
1174
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001175 def hardlink(self, digest, dest, file_mode):
1176 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001177 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001178 if file_mode is not None:
1179 # Ignores all other bits.
1180 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001181
1182
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001183def get_hash_algo(_namespace):
1184 """Return hash algorithm class to use when uploading to given |namespace|."""
1185 # TODO(vadimsh): Implement this at some point.
1186 return hashlib.sha1
1187
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001188
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001189def is_namespace_with_compression(namespace):
1190 """Returns True if given |namespace| stores compressed objects."""
1191 return namespace.endswith(('-gzip', '-deflate'))
1192
1193
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001194def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001195 """Returns an object that implements StorageApi interface."""
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001196 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001197 return IsolateServer(file_or_url, namespace)
1198 else:
1199 return FileSystem(file_or_url)
1200
1201
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001202def get_storage(file_or_url, namespace):
1203 """Returns Storage class configured with appropriate StorageApi instance."""
1204 return Storage(
1205 get_storage_api(file_or_url, namespace),
1206 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001207
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001208
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001209def expand_symlinks(indir, relfile):
1210 """Follows symlinks in |relfile|, but treating symlinks that point outside the
1211 build tree as if they were ordinary directories/files. Returns the final
1212 symlink-free target and a list of paths to symlinks encountered in the
1213 process.
1214
1215 The rule about symlinks outside the build tree is for the benefit of the
1216 Chromium OS ebuild, which symlinks the output directory to an unrelated path
1217 in the chroot.
1218
1219 Fails when a directory loop is detected, although in theory we could support
1220 that case.
1221 """
1222 is_directory = relfile.endswith(os.path.sep)
1223 done = indir
1224 todo = relfile.strip(os.path.sep)
1225 symlinks = []
1226
1227 while todo:
1228 pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1229 done, todo)
1230 if not symlink:
1231 todo = file_path.fix_native_path_case(done, todo)
1232 done = os.path.join(done, todo)
1233 break
1234 symlink_path = os.path.join(done, pre_symlink, symlink)
1235 post_symlink = post_symlink.lstrip(os.path.sep)
1236 # readlink doesn't exist on Windows.
1237 # pylint: disable=E1101
1238 target = os.path.normpath(os.path.join(done, pre_symlink))
1239 symlink_target = os.readlink(symlink_path)
1240 if os.path.isabs(symlink_target):
1241 # Absolute path are considered a normal directories. The use case is
1242 # generally someone who puts the output directory on a separate drive.
1243 target = symlink_target
1244 else:
1245 # The symlink itself could be using the wrong path case.
1246 target = file_path.fix_native_path_case(target, symlink_target)
1247
1248 if not os.path.exists(target):
1249 raise MappingError(
1250 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1251 target = file_path.get_native_path_case(target)
1252 if not file_path.path_starts_with(indir, target):
1253 done = symlink_path
1254 todo = post_symlink
1255 continue
1256 if file_path.path_starts_with(target, symlink_path):
1257 raise MappingError(
1258 'Can\'t map recursive symlink reference %s -> %s' %
1259 (symlink_path, target))
1260 logging.info('Found symlink: %s -> %s', symlink_path, target)
1261 symlinks.append(os.path.relpath(symlink_path, indir))
1262 # Treat the common prefix of the old and new paths as done, and start
1263 # scanning again.
1264 target = target.split(os.path.sep)
1265 symlink_path = symlink_path.split(os.path.sep)
1266 prefix_length = 0
1267 for target_piece, symlink_path_piece in zip(target, symlink_path):
1268 if target_piece == symlink_path_piece:
1269 prefix_length += 1
1270 else:
1271 break
1272 done = os.path.sep.join(target[:prefix_length])
1273 todo = os.path.join(
1274 os.path.sep.join(target[prefix_length:]), post_symlink)
1275
1276 relfile = os.path.relpath(done, indir)
1277 relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1278 return relfile, symlinks
1279
1280
1281def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1282 """Expands a single input. It can result in multiple outputs.
1283
1284 This function is recursive when relfile is a directory.
1285
1286 Note: this code doesn't properly handle recursive symlink like one created
1287 with:
1288 ln -s .. foo
1289 """
1290 if os.path.isabs(relfile):
1291 raise MappingError('Can\'t map absolute path %s' % relfile)
1292
1293 infile = file_path.normpath(os.path.join(indir, relfile))
1294 if not infile.startswith(indir):
1295 raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1296
1297 filepath = os.path.join(indir, relfile)
1298 native_filepath = file_path.get_native_path_case(filepath)
1299 if filepath != native_filepath:
1300 # Special case './'.
1301 if filepath != native_filepath + '.' + os.path.sep:
1302 # Give up enforcing strict path case on OSX. Really, it's that sad. The
1303 # case where it happens is very specific and hard to reproduce:
1304 # get_native_path_case(
1305 # u'Foo.framework/Versions/A/Resources/Something.nib') will return
1306 # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1307 #
1308 # Note that this is really something deep in OSX because running
1309 # ls Foo.framework/Versions/A
1310 # will print out 'Resources', while file_path.get_native_path_case()
1311 # returns a lower case 'r'.
1312 #
1313 # So *something* is happening under the hood resulting in the command 'ls'
1314 # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
1315 # have no idea why.
1316 if sys.platform != 'darwin':
1317 raise MappingError(
1318 'File path doesn\'t equal native file path\n%s != %s' %
1319 (filepath, native_filepath))
1320
1321 symlinks = []
1322 if follow_symlinks:
1323 relfile, symlinks = expand_symlinks(indir, relfile)
1324
1325 if relfile.endswith(os.path.sep):
1326 if not os.path.isdir(infile):
1327 raise MappingError(
1328 '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1329
1330 # Special case './'.
1331 if relfile.startswith('.' + os.path.sep):
1332 relfile = relfile[2:]
1333 outfiles = symlinks
1334 try:
1335 for filename in os.listdir(infile):
1336 inner_relfile = os.path.join(relfile, filename)
1337 if blacklist and blacklist(inner_relfile):
1338 continue
1339 if os.path.isdir(os.path.join(indir, inner_relfile)):
1340 inner_relfile += os.path.sep
1341 outfiles.extend(
1342 expand_directory_and_symlink(indir, inner_relfile, blacklist,
1343 follow_symlinks))
1344 return outfiles
1345 except OSError as e:
1346 raise MappingError(
1347 'Unable to iterate over directory %s.\n%s' % (infile, e))
1348 else:
1349 # Always add individual files even if they were blacklisted.
1350 if os.path.isdir(infile):
1351 raise MappingError(
1352 'Input directory %s must have a trailing slash' % infile)
1353
1354 if not os.path.isfile(infile):
1355 raise MappingError('Input file %s doesn\'t exist' % infile)
1356
1357 return symlinks + [relfile]
1358
1359
1360def process_input(filepath, prevdict, read_only, flavor, algo):
1361 """Processes an input file, a dependency, and return meta data about it.
1362
1363 Behaviors:
1364 - Retrieves the file mode, file size, file timestamp, file link
1365 destination if it is a file link and calcultate the SHA-1 of the file's
1366 content if the path points to a file and not a symlink.
1367
1368 Arguments:
1369 filepath: File to act on.
1370 prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1371 to skip recalculating the hash. Optional.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001372 read_only: If 1 or 2, the file mode is manipulated. In practice, only save
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001373 one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1374 windows, mode is not set since all files are 'executable' by
1375 default.
1376 flavor: One isolated flavor, like 'linux', 'mac' or 'win'.
1377 algo: Hashing algorithm used.
1378
1379 Returns:
1380 The necessary data to create a entry in the 'files' section of an .isolated
1381 file.
1382 """
1383 out = {}
1384 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1385 # if prevdict.get('T') == True:
1386 # # The file's content is ignored. Skip the time and hard code mode.
1387 # if get_flavor() != 'win':
1388 # out['m'] = stat.S_IRUSR | stat.S_IRGRP
1389 # out['s'] = 0
1390 # out['h'] = algo().hexdigest()
1391 # out['T'] = True
1392 # return out
1393
1394 # Always check the file stat and check if it is a link. The timestamp is used
1395 # to know if the file's content/symlink destination should be looked into.
1396 # E.g. only reuse from prevdict if the timestamp hasn't changed.
1397 # There is the risk of the file's timestamp being reset to its last value
1398 # manually while its content changed. We don't protect against that use case.
1399 try:
1400 filestats = os.lstat(filepath)
1401 except OSError:
1402 # The file is not present.
1403 raise MappingError('%s is missing' % filepath)
1404 is_link = stat.S_ISLNK(filestats.st_mode)
1405
1406 if flavor != 'win':
1407 # Ignore file mode on Windows since it's not really useful there.
1408 filemode = stat.S_IMODE(filestats.st_mode)
1409 # Remove write access for group and all access to 'others'.
1410 filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1411 if read_only:
1412 filemode &= ~stat.S_IWUSR
1413 if filemode & stat.S_IXUSR:
1414 filemode |= stat.S_IXGRP
1415 else:
1416 filemode &= ~stat.S_IXGRP
1417 if not is_link:
1418 out['m'] = filemode
1419
1420 # Used to skip recalculating the hash or link destination. Use the most recent
1421 # update time.
1422 # TODO(maruel): Save it in the .state file instead of .isolated so the
1423 # .isolated file is deterministic.
1424 out['t'] = int(round(filestats.st_mtime))
1425
1426 if not is_link:
1427 out['s'] = filestats.st_size
1428 # If the timestamp wasn't updated and the file size is still the same, carry
1429 # on the sha-1.
1430 if (prevdict.get('t') == out['t'] and
1431 prevdict.get('s') == out['s']):
1432 # Reuse the previous hash if available.
1433 out['h'] = prevdict.get('h')
1434 if not out.get('h'):
1435 out['h'] = hash_file(filepath, algo)
1436 else:
1437 # If the timestamp wasn't updated, carry on the link destination.
1438 if prevdict.get('t') == out['t']:
1439 # Reuse the previous link destination if available.
1440 out['l'] = prevdict.get('l')
1441 if out.get('l') is None:
1442 # The link could be in an incorrect path case. In practice, this only
1443 # happen on OSX on case insensitive HFS.
1444 # TODO(maruel): It'd be better if it was only done once, in
1445 # expand_directory_and_symlink(), so it would not be necessary to do again
1446 # here.
1447 symlink_value = os.readlink(filepath) # pylint: disable=E1101
1448 filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1449 native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1450 out['l'] = os.path.relpath(native_dest, filedir)
1451 return out
1452
1453
1454def save_isolated(isolated, data):
1455 """Writes one or multiple .isolated files.
1456
1457 Note: this reference implementation does not create child .isolated file so it
1458 always returns an empty list.
1459
1460 Returns the list of child isolated files that are included by |isolated|.
1461 """
1462 # Make sure the data is valid .isolated data by 'reloading' it.
1463 algo = SUPPORTED_ALGOS[data['algo']]
1464 load_isolated(json.dumps(data), data.get('flavor'), algo)
1465 tools.write_json(isolated, data, True)
1466 return []
1467
1468
1469
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001470def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001471 """Uploads the given tree to the given url.
1472
1473 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001474 base_url: The base url, it is assume that |base_url|/has/ can be used to
1475 query if an element was already uploaded, and |base_url|/store/
1476 can be used to upload a new element.
1477 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001478 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001479 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001480 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001481 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001482 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001483 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001484
1485
maruel@chromium.org41601642013-09-18 19:40:46 +00001486def load_isolated(content, os_flavor, algo):
1487 """Verifies the .isolated file is valid and loads this object with the json
1488 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001489
1490 Arguments:
1491 - content: raw serialized content to load.
1492 - os_flavor: OS to load this file on. Optional.
1493 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1494 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001495 """
1496 try:
1497 data = json.loads(content)
1498 except ValueError:
1499 raise ConfigError('Failed to parse: %s...' % content[:100])
1500
1501 if not isinstance(data, dict):
1502 raise ConfigError('Expected dict, got %r' % data)
1503
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001504 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001505 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001506 value = data.get('version', ISOLATED_FILE_VERSION)
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001507 if not isinstance(value, basestring):
1508 raise ConfigError('Expected string, got %r' % value)
1509 if not re.match(r'^(\d+)\.(\d+)$', value):
1510 raise ConfigError('Expected a compatible version, got %r' % value)
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001511 if value.split('.', 1)[0] != ISOLATED_FILE_VERSION.split('.', 1)[0]:
1512 raise ConfigError(
1513 'Expected compatible \'%s\' version, got %r' %
1514 (ISOLATED_FILE_VERSION, value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001515
1516 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001517 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001518 # Default the algorithm used in the .isolated file itself, falls back to
1519 # 'sha-1' if unspecified.
1520 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1521
maruel@chromium.org41601642013-09-18 19:40:46 +00001522 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001523 if key == 'algo':
1524 if not isinstance(value, basestring):
1525 raise ConfigError('Expected string, got %r' % value)
1526 if value not in SUPPORTED_ALGOS:
1527 raise ConfigError(
1528 'Expected one of \'%s\', got %r' %
1529 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1530 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1531 raise ConfigError(
1532 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1533
1534 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001535 if not isinstance(value, list):
1536 raise ConfigError('Expected list, got %r' % value)
1537 if not value:
1538 raise ConfigError('Expected non-empty command')
1539 for subvalue in value:
1540 if not isinstance(subvalue, basestring):
1541 raise ConfigError('Expected string, got %r' % subvalue)
1542
1543 elif key == 'files':
1544 if not isinstance(value, dict):
1545 raise ConfigError('Expected dict, got %r' % value)
1546 for subkey, subvalue in value.iteritems():
1547 if not isinstance(subkey, basestring):
1548 raise ConfigError('Expected string, got %r' % subkey)
1549 if not isinstance(subvalue, dict):
1550 raise ConfigError('Expected dict, got %r' % subvalue)
1551 for subsubkey, subsubvalue in subvalue.iteritems():
1552 if subsubkey == 'l':
1553 if not isinstance(subsubvalue, basestring):
1554 raise ConfigError('Expected string, got %r' % subsubvalue)
1555 elif subsubkey == 'm':
1556 if not isinstance(subsubvalue, int):
1557 raise ConfigError('Expected int, got %r' % subsubvalue)
1558 elif subsubkey == 'h':
1559 if not is_valid_hash(subsubvalue, algo):
1560 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1561 elif subsubkey == 's':
Marc-Antoine Ruelaab3a622013-11-28 09:47:05 -05001562 if not isinstance(subsubvalue, (int, long)):
1563 raise ConfigError('Expected int or long, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001564 else:
1565 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001566 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001567 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001568 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1569 subvalue)
1570 if bool('h' in subvalue) != bool('s' in subvalue):
1571 raise ConfigError(
1572 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1573 subvalue)
1574 if bool('s' in subvalue) == bool('l' in subvalue):
1575 raise ConfigError(
1576 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1577 subvalue)
1578 if bool('l' in subvalue) and bool('m' in subvalue):
1579 raise ConfigError(
1580 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001581 subvalue)
1582
1583 elif key == 'includes':
1584 if not isinstance(value, list):
1585 raise ConfigError('Expected list, got %r' % value)
1586 if not value:
1587 raise ConfigError('Expected non-empty includes list')
1588 for subvalue in value:
1589 if not is_valid_hash(subvalue, algo):
1590 raise ConfigError('Expected sha-1, got %r' % subvalue)
1591
1592 elif key == 'read_only':
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001593 if not value in (0, 1, 2):
1594 raise ConfigError('Expected 0, 1 or 2, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001595
1596 elif key == 'relative_cwd':
1597 if not isinstance(value, basestring):
1598 raise ConfigError('Expected string, got %r' % value)
1599
1600 elif key == 'os':
1601 if os_flavor and value != os_flavor:
1602 raise ConfigError(
1603 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1604 (os_flavor, value))
1605
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001606 elif key == 'version':
1607 # Already checked above.
1608 pass
1609
maruel@chromium.org41601642013-09-18 19:40:46 +00001610 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001611 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001612
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001613 # Automatically fix os.path.sep if necessary. While .isolated files are always
1614 # in the the native path format, someone could want to download an .isolated
1615 # tree from another OS.
1616 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1617 if 'files' in data:
1618 data['files'] = dict(
1619 (k.replace(wrong_path_sep, os.path.sep), v)
1620 for k, v in data['files'].iteritems())
1621 for v in data['files'].itervalues():
1622 if 'l' in v:
1623 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1624 if 'relative_cwd' in data:
1625 data['relative_cwd'] = data['relative_cwd'].replace(
1626 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001627 return data
1628
1629
1630class IsolatedFile(object):
1631 """Represents a single parsed .isolated file."""
1632 def __init__(self, obj_hash, algo):
1633 """|obj_hash| is really the sha-1 of the file."""
1634 logging.debug('IsolatedFile(%s)' % obj_hash)
1635 self.obj_hash = obj_hash
1636 self.algo = algo
1637 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1638 # .isolate and all the .isolated files recursively included by it with
1639 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1640 # .isolated file in the hash table, is important, as the later ones are not
1641 # processed until the firsts are retrieved and read.
1642 self.can_fetch = False
1643
1644 # Raw data.
1645 self.data = {}
1646 # A IsolatedFile instance, one per object in self.includes.
1647 self.children = []
1648
1649 # Set once the .isolated file is loaded.
1650 self._is_parsed = False
1651 # Set once the files are fetched.
1652 self.files_fetched = False
1653
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001654 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001655 """Verifies the .isolated file is valid and loads this object with the json
1656 data.
1657 """
1658 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1659 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001660 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001661 self.children = [
1662 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1663 ]
1664 self._is_parsed = True
1665
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001666 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001667 """Adds files in this .isolated file not present in |files| dictionary.
1668
1669 Preemptively request files.
1670
1671 Note that |files| is modified by this function.
1672 """
1673 assert self.can_fetch
1674 if not self._is_parsed or self.files_fetched:
1675 return
1676 logging.debug('fetch_files(%s)' % self.obj_hash)
1677 for filepath, properties in self.data.get('files', {}).iteritems():
1678 # Root isolated has priority on the files being mapped. In particular,
1679 # overriden files must not be fetched.
1680 if filepath not in files:
1681 files[filepath] = properties
1682 if 'h' in properties:
1683 # Preemptively request files.
1684 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001685 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001686 self.files_fetched = True
1687
1688
1689class Settings(object):
1690 """Results of a completely parsed .isolated file."""
1691 def __init__(self):
1692 self.command = []
1693 self.files = {}
1694 self.read_only = None
1695 self.relative_cwd = None
1696 # The main .isolated file, a IsolatedFile instance.
1697 self.root = None
1698
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001699 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001700 """Loads the .isolated and all the included .isolated asynchronously.
1701
1702 It enables support for "included" .isolated files. They are processed in
1703 strict order but fetched asynchronously from the cache. This is important so
1704 that a file in an included .isolated file that is overridden by an embedding
1705 .isolated file is not fetched needlessly. The includes are fetched in one
1706 pass and the files are fetched as soon as all the ones on the left-side
1707 of the tree were fetched.
1708
1709 The prioritization is very important here for nested .isolated files.
1710 'includes' have the highest priority and the algorithm is optimized for both
1711 deep and wide trees. A deep one is a long link of .isolated files referenced
1712 one at a time by one item in 'includes'. A wide one has a large number of
1713 'includes' in a single .isolated file. 'left' is defined as an included
1714 .isolated file earlier in the 'includes' list. So the order of the elements
1715 in 'includes' is important.
1716 """
1717 self.root = IsolatedFile(root_isolated_hash, algo)
1718
1719 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1720 pending = {}
1721 # Set of hashes of already retrieved items to refuse recursive includes.
1722 seen = set()
1723
1724 def retrieve(isolated_file):
1725 h = isolated_file.obj_hash
1726 if h in seen:
1727 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1728 assert h not in pending
1729 seen.add(h)
1730 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001731 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001732
1733 retrieve(self.root)
1734
1735 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001736 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001737 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001738 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001739 if item_hash == root_isolated_hash:
1740 # It's the root item.
1741 item.can_fetch = True
1742
1743 for new_child in item.children:
1744 retrieve(new_child)
1745
1746 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001747 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001748
1749 def check(n):
1750 return all(check(x) for x in n.children) and n.files_fetched
1751 assert check(self.root)
1752
1753 self.relative_cwd = self.relative_cwd or ''
maruel@chromium.org41601642013-09-18 19:40:46 +00001754
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001755 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001756 if node.can_fetch:
1757 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001758 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001759 will_break = False
1760 for i in node.children:
1761 if not i.can_fetch:
1762 if will_break:
1763 break
1764 # Automatically mark the first one as fetcheable.
1765 i.can_fetch = True
1766 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001767 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001768
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001769 def _update_self(self, fetch_queue, node):
1770 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001771 # Grabs properties.
1772 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001773 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001774 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001775 if self.command:
1776 self.command[0] = self.command[0].replace('/', os.path.sep)
1777 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001778 if self.read_only is None and node.data.get('read_only') is not None:
1779 self.read_only = node.data['read_only']
1780 if (self.relative_cwd is None and
1781 node.data.get('relative_cwd') is not None):
1782 self.relative_cwd = node.data['relative_cwd']
1783
1784
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001785def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001786 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001787 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001788
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001789 Arguments:
1790 isolated_hash: hash of the root *.isolated file.
1791 storage: Storage class that communicates with isolate storage.
1792 cache: LocalCache class that knows how to store and map files locally.
1793 algo: hash algorithm to use.
1794 outdir: Output directory to map file tree to.
1795 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1796 require_command: Ensure *.isolated specifies a command to run.
1797
1798 Returns:
1799 Settings object that holds details about loaded *.isolated file.
1800 """
1801 with cache:
1802 fetch_queue = FetchQueue(storage, cache)
1803 settings = Settings()
1804
1805 with tools.Profiler('GetIsolateds'):
1806 # Optionally support local files by manually adding them to cache.
1807 if not is_valid_hash(isolated_hash, algo):
1808 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1809
1810 # Load all *.isolated and start loading rest of the files.
1811 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001812 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001813 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1814 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001815 raise ConfigError('No command to run')
1816
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001817 with tools.Profiler('GetRest'):
1818 # Create file system hierarchy.
1819 if not os.path.isdir(outdir):
1820 os.makedirs(outdir)
1821 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001822 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001823
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001824 # Ensure working directory exists.
1825 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1826 if not os.path.isdir(cwd):
1827 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001828
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001829 # Multimap: digest -> list of pairs (path, props).
1830 remaining = {}
1831 for filepath, props in settings.files.iteritems():
1832 if 'h' in props:
1833 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001834
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001835 # Now block on the remaining files to be downloaded and mapped.
1836 logging.info('Retrieving remaining files (%d of them)...',
1837 fetch_queue.pending_count)
1838 last_update = time.time()
1839 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1840 while remaining:
1841 detector.ping()
1842
1843 # Wait for any item to finish fetching to cache.
1844 digest = fetch_queue.wait(remaining)
1845
1846 # Link corresponding files to a fetched item in cache.
1847 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001848 cache.hardlink(
1849 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001850
1851 # Report progress.
1852 duration = time.time() - last_update
1853 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1854 msg = '%d files remaining...' % len(remaining)
1855 print msg
1856 logging.info(msg)
1857 last_update = time.time()
1858
1859 # Cache could evict some items we just tried to fetch, it's a fatal error.
1860 if not fetch_queue.verify_all_cached():
1861 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001862 return settings
1863
1864
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001865def directory_to_metadata(root, algo, blacklist):
1866 """Returns the FileItem list and .isolated metadata for a directory."""
1867 root = file_path.get_native_path_case(root)
1868 metadata = dict(
1869 (relpath, process_input(
1870 os.path.join(root, relpath), {}, False, sys.platform, algo))
1871 for relpath in expand_directory_and_symlink(
1872 root, './', blacklist, True)
1873 )
1874 for v in metadata.itervalues():
1875 v.pop('t')
1876 items = [
1877 FileItem(
1878 path=os.path.join(root, relpath),
1879 digest=meta['h'],
1880 size=meta['s'],
1881 is_isolated=relpath.endswith('.isolated'))
1882 for relpath, meta in metadata.iteritems() if 'h' in meta
1883 ]
1884 return items, metadata
1885
1886
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001887def archive_files_to_storage(storage, algo, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001888 """Stores every entries and returns the relevant data.
1889
1890 Arguments:
1891 storage: a Storage object that communicates with the remote object store.
1892 algo: an hashlib class to hash content. Usually hashlib.sha1.
1893 files: list of file paths to upload. If a directory is specified, a
1894 .isolated file is created and its hash is returned.
1895 blacklist: function that returns True if a file should be omitted.
1896 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001897 assert all(isinstance(i, unicode) for i in files), files
1898 if len(files) != len(set(map(os.path.abspath, files))):
1899 raise Error('Duplicate entries found.')
1900
1901 results = []
1902 # The temporary directory is only created as needed.
1903 tempdir = None
1904 try:
1905 # TODO(maruel): Yield the files to a worker thread.
1906 items_to_upload = []
1907 for f in files:
1908 try:
1909 filepath = os.path.abspath(f)
1910 if os.path.isdir(filepath):
1911 # Uploading a whole directory.
1912 items, metadata = directory_to_metadata(filepath, algo, blacklist)
1913
1914 # Create the .isolated file.
1915 if not tempdir:
1916 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1917 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1918 os.close(handle)
1919 data = {
1920 'algo': SUPPORTED_ALGOS_REVERSE[algo],
1921 'files': metadata,
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001922 'version': ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001923 }
1924 save_isolated(isolated, data)
1925 h = hash_file(isolated, algo)
1926 items_to_upload.extend(items)
1927 items_to_upload.append(
1928 FileItem(
1929 path=isolated,
1930 digest=h,
1931 size=os.stat(isolated).st_size,
1932 is_isolated=True))
1933 results.append((h, f))
1934
1935 elif os.path.isfile(filepath):
1936 h = hash_file(filepath, algo)
1937 items_to_upload.append(
1938 FileItem(
1939 path=filepath,
1940 digest=h,
1941 size=os.stat(filepath).st_size,
1942 is_isolated=f.endswith('.isolated')))
1943 results.append((h, f))
1944 else:
1945 raise Error('%s is neither a file or directory.' % f)
1946 except OSError:
1947 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001948 # Technically we would care about which files were uploaded but we don't
1949 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001950 _uploaded_files = storage.upload_items(items_to_upload)
1951 return results
1952 finally:
1953 if tempdir:
1954 shutil.rmtree(tempdir)
1955
1956
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001957def archive(out, namespace, files, blacklist):
1958 if files == ['-']:
1959 files = sys.stdin.readlines()
1960
1961 if not files:
1962 raise Error('Nothing to upload')
1963
1964 files = [f.decode('utf-8') for f in files]
1965 algo = get_hash_algo(namespace)
1966 blacklist = tools.gen_blacklist(blacklist)
1967 with get_storage(out, namespace) as storage:
1968 results = archive_files_to_storage(storage, algo, files, blacklist)
1969 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1970
1971
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001972@subcommand.usage('<file1..fileN> or - to read from stdin')
1973def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001974 """Archives data to the server.
1975
1976 If a directory is specified, a .isolated file is created the whole directory
1977 is uploaded. Then this .isolated file can be included in another one to run
1978 commands.
1979
1980 The commands output each file that was processed with its content hash. For
1981 directories, the .isolated generated for the directory is listed as the
1982 directory entry itself.
1983 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001984 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001985 parser.add_option(
1986 '--blacklist',
1987 action='append', default=list(DEFAULT_BLACKLIST),
1988 help='List of regexp to use as blacklist filter when uploading '
1989 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001990 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001991 process_isolate_server_options(parser, options)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001992 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001993 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001994 except Error as e:
1995 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001996 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001997
1998
1999def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002000 """Download data from the server.
2001
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002002 It can either download individual files or a complete tree from a .isolated
2003 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002004 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002005 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002006 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002007 '-i', '--isolated', metavar='HASH',
2008 help='hash of an isolated file, .isolated file content is discarded, use '
2009 '--file if you need it')
2010 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002011 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2012 help='hash and destination of a file, can be used multiple times')
2013 parser.add_option(
2014 '-t', '--target', metavar='DIR', default=os.getcwd(),
2015 help='destination directory')
2016 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002017 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002018 if args:
2019 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002020 if bool(options.isolated) == bool(options.file):
2021 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002022
2023 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002024
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002025 remote = options.isolate_server or options.indir
2026 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002027 # Fetching individual files.
2028 if options.file:
2029 channel = threading_utils.TaskChannel()
2030 pending = {}
2031 for digest, dest in options.file:
2032 pending[digest] = dest
2033 storage.async_fetch(
2034 channel,
2035 WorkerPool.MED,
2036 digest,
2037 UNKNOWN_FILE_SIZE,
2038 functools.partial(file_write, os.path.join(options.target, dest)))
2039 while pending:
2040 fetched = channel.pull()
2041 dest = pending.pop(fetched)
2042 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002043
Vadim Shtayura3172be52013-12-03 12:49:05 -08002044 # Fetching whole isolated tree.
2045 if options.isolated:
2046 settings = fetch_isolated(
2047 isolated_hash=options.isolated,
2048 storage=storage,
2049 cache=MemoryCache(),
2050 algo=get_hash_algo(options.namespace),
2051 outdir=options.target,
2052 os_flavor=None,
2053 require_command=False)
2054 rel = os.path.join(options.target, settings.relative_cwd)
2055 print('To run this test please run from the directory %s:' %
2056 os.path.join(options.target, rel))
2057 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002058
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002059 return 0
2060
2061
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002062@subcommand.usage('<file1..fileN> or - to read from stdin')
2063def CMDhashtable(parser, args):
2064 """Archives data to a hashtable on the file system.
2065
2066 If a directory is specified, a .isolated file is created the whole directory
2067 is uploaded. Then this .isolated file can be included in another one to run
2068 commands.
2069
2070 The commands output each file that was processed with its content hash. For
2071 directories, the .isolated generated for the directory is listed as the
2072 directory entry itself.
2073 """
2074 add_outdir_options(parser)
2075 parser.add_option(
2076 '--blacklist',
2077 action='append', default=list(DEFAULT_BLACKLIST),
2078 help='List of regexp to use as blacklist filter when uploading '
2079 'directories')
2080 options, files = parser.parse_args(args)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002081 process_outdir_options(parser, options, os.getcwd())
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002082 try:
2083 # Do not compress files when archiving to the file system.
2084 archive(options.outdir, 'default', files, options.blacklist)
2085 except Error as e:
2086 parser.error(e.args[0])
2087 return 0
2088
2089
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002090def add_isolate_server_options(parser, add_indir):
2091 """Adds --isolate-server and --namespace options to parser.
2092
2093 Includes --indir if desired.
2094 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002095 parser.add_option(
2096 '-I', '--isolate-server',
2097 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002098 help='URL of the Isolate Server to use. Defaults to the environment '
2099 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2100 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002101 parser.add_option(
2102 '--namespace', default='default-gzip',
2103 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002104 if add_indir:
2105 parser.add_option(
2106 '--indir', metavar='DIR',
2107 help='Directory used to store the hashtable instead of using an '
2108 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002109
2110
2111def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002112 """Processes the --isolate-server and --indir options and aborts if neither is
2113 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002114 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002115 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002116 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002117 if not has_indir:
2118 parser.error('--isolate-server is required.')
2119 elif not options.indir:
2120 parser.error('Use one of --indir or --isolate-server.')
2121 else:
2122 if has_indir and options.indir:
2123 parser.error('Use only one of --indir or --isolate-server.')
2124
2125 if options.isolate_server:
2126 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002127 if parts.query:
2128 parser.error('--isolate-server doesn\'t support query parameter.')
2129 if parts.fragment:
2130 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002131 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2132 # what is desired here.
2133 new = list(parts)
2134 if not new[1] and new[2]:
2135 new[1] = new[2].rstrip('/')
2136 new[2] = ''
2137 new[2] = new[2].rstrip('/')
2138 options.isolate_server = urlparse.urlunparse(new)
2139 return
2140
2141 if file_path.is_url(options.indir):
2142 parser.error('Can\'t use an URL for --indir.')
2143 options.indir = unicode(options.indir).replace('/', os.path.sep)
2144 options.indir = os.path.abspath(
2145 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2146 if not os.path.isdir(options.indir):
2147 parser.error('Path given to --indir must exist.')
2148
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002149
2150
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002151def add_outdir_options(parser):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002152 """Adds --outdir, which is orthogonal to --isolate-server.
2153
2154 Note: On upload, separate commands are used between 'archive' and 'hashtable'.
2155 On 'download', the same command can download from either an isolate server or
2156 a file system.
2157 """
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002158 parser.add_option(
2159 '-o', '--outdir', metavar='DIR',
2160 help='Directory used to recreate the tree.')
2161
2162
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002163def process_outdir_options(parser, options, cwd):
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002164 if not options.outdir:
2165 parser.error('--outdir is required.')
2166 if file_path.is_url(options.outdir):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002167 parser.error('Can\'t use an URL for --outdir.')
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002168 options.outdir = unicode(options.outdir).replace('/', os.path.sep)
2169 # outdir doesn't need native path case since tracing is never done from there.
2170 options.outdir = os.path.abspath(
2171 os.path.normpath(os.path.join(cwd, options.outdir)))
2172 # In theory, we'd create the directory outdir right away. Defer doing it in
2173 # case there's errors in the command line.
2174
2175
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002176class OptionParserIsolateServer(tools.OptionParserWithLogging):
2177 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002178 tools.OptionParserWithLogging.__init__(
2179 self,
2180 version=__version__,
2181 prog=os.path.basename(sys.modules[__name__].__file__),
2182 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002183 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002184
2185 def parse_args(self, *args, **kwargs):
2186 options, args = tools.OptionParserWithLogging.parse_args(
2187 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002188 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002189 return options, args
2190
2191
2192def main(args):
2193 dispatcher = subcommand.CommandDispatcher(__name__)
2194 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002195 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00002196 except Exception as e:
2197 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002198 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002199
2200
2201if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002202 fix_encoding.fix_encoding()
2203 tools.disable_buffering()
2204 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002205 sys.exit(main(sys.argv[1:]))