blob: 8a74b809ddf0110a9962c6db02a1066950d5e98d [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05006"""Archives a set of files or directories to a server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05008__version__ = '0.3.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import shutil
17import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050019import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
36
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000038# Version of isolate protocol passed to the server in /handshake request.
39ISOLATE_PROTOCOL_VERSION = '1.0'
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -050040# Version stored and expected in .isolated files.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -050041ISOLATED_FILE_VERSION = '1.3'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043
44# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000045# All files are sorted by likelihood of a change in the file content
46# (currently file size is used to estimate this: larger the file -> larger the
47# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000049# and so on. Numbers here is a trade-off; the more per request, the lower the
50# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
51# larger values cause longer lookups, increasing the initial latency to start
52# uploading, which is especially an issue for large files. This value is
53# optimized for the "few thousands files to look up with minimal number of large
54# files missing" case.
55ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000056
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000057
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000058# A list of already compressed extension types that should not receive any
59# compression before being uploaded.
60ALREADY_COMPRESSED_TYPES = [
61 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
62 'wav', 'zip'
63]
64
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000065
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000066# The file size to be used when we don't know the correct file size,
67# generally used for .isolated files.
68UNKNOWN_FILE_SIZE = None
69
70
71# The size of each chunk to read when downloading and unzipping files.
72ZIPPED_FILE_CHUNK = 16 * 1024
73
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000074# Chunk size to use when doing disk I/O.
75DISK_FILE_CHUNK = 1024 * 1024
76
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000077# Chunk size to use when reading from network stream.
78NET_IO_FILE_CHUNK = 16 * 1024
79
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000080
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081# Read timeout in seconds for downloads from isolate storage. If there's no
82# response from the server within this timeout whole download will be aborted.
83DOWNLOAD_READ_TIMEOUT = 60
84
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000085# Maximum expected delay (in seconds) between successive file fetches
86# in run_tha_test. If it takes longer than that, a deadlock might be happening
87# and all stack frames for all threads are dumped to log.
88DEADLOCK_TIMEOUT = 5 * 60
89
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000090
maruel@chromium.org41601642013-09-18 19:40:46 +000091# The delay (in seconds) to wait between logging statements when retrieving
92# the required files. This is intended to let the user (or buildbot) know that
93# the program is still running.
94DELAY_BETWEEN_UPDATES_IN_SECS = 30
95
96
maruel@chromium.org385d73d2013-09-19 18:33:21 +000097# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
98# specify the names here.
99SUPPORTED_ALGOS = {
100 'md5': hashlib.md5,
101 'sha-1': hashlib.sha1,
102 'sha-512': hashlib.sha512,
103}
104
105
106# Used for serialization.
107SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
108
109
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500110DEFAULT_BLACKLIST = (
111 # Temporary vim or python files.
112 r'^.+\.(?:pyc|swp)$',
113 # .git or .svn directory.
114 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
115)
116
117
118# Chromium-specific.
119DEFAULT_BLACKLIST += (
120 r'^.+\.(?:run_test_cases)$',
121 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
122)
123
124
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500125class Error(Exception):
126 """Generic runtime error."""
127 pass
128
129
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000130class ConfigError(ValueError):
131 """Generic failure to load a .isolated file."""
132 pass
133
134
135class MappingError(OSError):
136 """Failed to recreate the tree."""
137 pass
138
139
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000140def is_valid_hash(value, algo):
141 """Returns if the value is a valid hash for the corresponding algorithm."""
142 size = 2 * algo().digest_size
143 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
144
145
146def hash_file(filepath, algo):
147 """Calculates the hash of a file without reading it all in memory at once.
148
149 |algo| should be one of hashlib hashing algorithm.
150 """
151 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000152 with open(filepath, 'rb') as f:
153 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000155 if not chunk:
156 break
157 digest.update(chunk)
158 return digest.hexdigest()
159
160
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000161def stream_read(stream, chunk_size):
162 """Reads chunks from |stream| and yields them."""
163 while True:
164 data = stream.read(chunk_size)
165 if not data:
166 break
167 yield data
168
169
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800170def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
171 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000172 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800173 if offset:
174 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000175 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000176 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000177 if not data:
178 break
179 yield data
180
181
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000182def file_write(filepath, content_generator):
183 """Writes file content as generated by content_generator.
184
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000185 Creates the intermediary directory as needed.
186
187 Returns the number of bytes written.
188
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000189 Meant to be mocked out in unit tests.
190 """
191 filedir = os.path.dirname(filepath)
192 if not os.path.isdir(filedir):
193 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000194 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000195 with open(filepath, 'wb') as f:
196 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000197 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000198 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000199 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000200
201
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000202def zip_compress(content_generator, level=7):
203 """Reads chunks from |content_generator| and yields zip compressed chunks."""
204 compressor = zlib.compressobj(level)
205 for chunk in content_generator:
206 compressed = compressor.compress(chunk)
207 if compressed:
208 yield compressed
209 tail = compressor.flush(zlib.Z_FINISH)
210 if tail:
211 yield tail
212
213
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000214def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
215 """Reads zipped data from |content_generator| and yields decompressed data.
216
217 Decompresses data in small chunks (no larger than |chunk_size|) so that
218 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
219
220 Raises IOError if data is corrupted or incomplete.
221 """
222 decompressor = zlib.decompressobj()
223 compressed_size = 0
224 try:
225 for chunk in content_generator:
226 compressed_size += len(chunk)
227 data = decompressor.decompress(chunk, chunk_size)
228 if data:
229 yield data
230 while decompressor.unconsumed_tail:
231 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
232 if data:
233 yield data
234 tail = decompressor.flush()
235 if tail:
236 yield tail
237 except zlib.error as e:
238 raise IOError(
239 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
240 # Ensure all data was read and decompressed.
241 if decompressor.unused_data or decompressor.unconsumed_tail:
242 raise IOError('Not all data was decompressed')
243
244
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000245def get_zip_compression_level(filename):
246 """Given a filename calculates the ideal zip compression level to use."""
247 file_ext = os.path.splitext(filename)[1].lower()
248 # TODO(csharp): Profile to find what compression level works best.
249 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
250
251
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000252def create_directories(base_directory, files):
253 """Creates the directory structure needed by the given list of files."""
254 logging.debug('create_directories(%s, %d)', base_directory, len(files))
255 # Creates the tree of directories to create.
256 directories = set(os.path.dirname(f) for f in files)
257 for item in list(directories):
258 while item:
259 directories.add(item)
260 item = os.path.dirname(item)
261 for d in sorted(directories):
262 if d:
263 os.mkdir(os.path.join(base_directory, d))
264
265
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500266def create_symlinks(base_directory, files):
267 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000268 for filepath, properties in files:
269 if 'l' not in properties:
270 continue
271 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500272 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000273 logging.warning('Ignoring symlink %s', filepath)
274 continue
275 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500276 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000277 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000278
279
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000280def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000281 """Determines if the given files appears valid.
282
283 Currently it just checks the file's size.
284 """
285 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000286 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000287 actual_size = os.stat(filepath).st_size
288 if size != actual_size:
289 logging.warning(
290 'Found invalid item %s; %d != %d',
291 os.path.basename(filepath), actual_size, size)
292 return False
293 return True
294
295
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000296class WorkerPool(threading_utils.AutoRetryThreadPool):
297 """Thread pool that automatically retries on IOError and runs a preconfigured
298 function.
299 """
300 # Initial and maximum number of worker threads.
301 INITIAL_WORKERS = 2
302 MAX_WORKERS = 16
303 RETRIES = 5
304
305 def __init__(self):
306 super(WorkerPool, self).__init__(
307 [IOError],
308 self.RETRIES,
309 self.INITIAL_WORKERS,
310 self.MAX_WORKERS,
311 0,
312 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000313
314
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000315class Item(object):
316 """An item to push to Storage.
317
318 It starts its life in a main thread, travels to 'contains' thread, then to
319 'push' thread and then finally back to the main thread.
320
321 It is never used concurrently from multiple threads.
322 """
323
324 def __init__(self, digest, size, is_isolated=False):
325 self.digest = digest
326 self.size = size
327 self.is_isolated = is_isolated
328 self.compression_level = 6
329 self.push_state = None
330
331 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000332 """Iterable with content of this item in chunks of given size.
333
334 Arguments:
335 chunk_size: preferred size of the chunk to produce, may be ignored.
336 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000337 raise NotImplementedError()
338
339
340class FileItem(Item):
341 """A file to push to Storage."""
342
343 def __init__(self, path, digest, size, is_isolated):
344 super(FileItem, self).__init__(digest, size, is_isolated)
345 self.path = path
346 self.compression_level = get_zip_compression_level(path)
347
348 def content(self, chunk_size):
349 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000350
351
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000352class BufferItem(Item):
353 """A byte buffer to push to Storage."""
354
355 def __init__(self, buf, algo, is_isolated=False):
356 super(BufferItem, self).__init__(
357 algo(buf).hexdigest(), len(buf), is_isolated)
358 self.buffer = buf
359
360 def content(self, _chunk_size):
361 return [self.buffer]
362
363
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000364class Storage(object):
365 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000366
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 def __init__(self, storage_api, use_zip):
368 self.use_zip = use_zip
369 self._storage_api = storage_api
370 self._cpu_thread_pool = None
371 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000372
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000373 @property
374 def cpu_thread_pool(self):
375 """ThreadPool for CPU-bound tasks like zipping."""
376 if self._cpu_thread_pool is None:
377 self._cpu_thread_pool = threading_utils.ThreadPool(
378 2, max(threading_utils.num_processors(), 2), 0, 'zip')
379 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000380
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000381 @property
382 def net_thread_pool(self):
383 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
384 if self._net_thread_pool is None:
385 self._net_thread_pool = WorkerPool()
386 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000387
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 def close(self):
389 """Waits for all pending tasks to finish."""
390 if self._cpu_thread_pool:
391 self._cpu_thread_pool.join()
392 self._cpu_thread_pool.close()
393 self._cpu_thread_pool = None
394 if self._net_thread_pool:
395 self._net_thread_pool.join()
396 self._net_thread_pool.close()
397 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000398
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000399 def __enter__(self):
400 """Context manager interface."""
401 return self
402
403 def __exit__(self, _exc_type, _exc_value, _traceback):
404 """Context manager interface."""
405 self.close()
406 return False
407
408 def upload_tree(self, indir, infiles):
409 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000410
411 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 indir: root directory the infiles are based in.
413 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000414
415 Returns:
416 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000417 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000418 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
419
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000420 # Convert |indir| + |infiles| into a list of FileItem objects.
421 # Filter out symlinks, since they are not represented by items on isolate
422 # server side.
423 items = [
424 FileItem(
425 path=os.path.join(indir, filepath),
426 digest=metadata['h'],
427 size=metadata['s'],
428 is_isolated=metadata.get('priority') == '0')
429 for filepath, metadata in infiles.iteritems()
430 if 'l' not in metadata
431 ]
432
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000433 return self.upload_items(items)
434
435 def upload_items(self, items):
436 """Uploads bunch of items to the isolate server.
437
438 Will upload only items that are missing.
439
440 Arguments:
441 items: list of Item instances that represents data to upload.
442
443 Returns:
444 List of items that were uploaded. All other items are already there.
445 """
446 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
447 # used by swarming.py. There's no need to spawn multiple threads and try to
448 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
449 # 'push' should be performed sequentially in the context of current thread.
450
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000451 # For each digest keep only first Item that matches it. All other items
452 # are just indistinguishable copies from the point of view of isolate
453 # server (it doesn't care about paths at all, only content and digests).
454 seen = {}
455 duplicates = 0
456 for item in items:
457 if seen.setdefault(item.digest, item) is not item:
458 duplicates += 1
459 items = seen.values()
460 if duplicates:
461 logging.info('Skipped %d duplicated files', duplicates)
462
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000464 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000466 for missing_item in self.get_missing_items(items):
467 missing.add(missing_item)
468 self.async_push(
469 channel,
470 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
471 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000473 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 # No need to spawn deadlock detector thread if there's nothing to upload.
475 if missing:
476 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
477 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000478 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000479 detector.ping()
480 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000481 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000482 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000483 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000484 logging.info('All files are uploaded')
485
486 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000487 total = len(items)
488 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 logging.info(
490 'Total: %6d, %9.1fkb',
491 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000492 total_size / 1024.)
493 cache_hit = set(items) - missing
494 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 logging.info(
496 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
497 len(cache_hit),
498 cache_hit_size / 1024.,
499 len(cache_hit) * 100. / total,
500 cache_hit_size * 100. / total_size if total_size else 0)
501 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000502 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000503 logging.info(
504 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
505 len(cache_miss),
506 cache_miss_size / 1024.,
507 len(cache_miss) * 100. / total,
508 cache_miss_size * 100. / total_size if total_size else 0)
509
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000510 return uploaded
511
512 def get_fetch_url(self, digest):
513 """Returns an URL that can be used to fetch an item with given digest.
514
515 Arguments:
516 digest: hex digest of item to fetch.
517
518 Returns:
519 An URL or None if underlying protocol doesn't support this.
520 """
521 return self._storage_api.get_fetch_url(digest)
522
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000523 def async_push(self, channel, priority, item):
524 """Starts asynchronous push to the server in a parallel thread.
525
526 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000527 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 priority: thread pool task priority for the push.
529 item: item to upload as instance of Item class.
530 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000531 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000533 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 return item
535
536 # If zipping is not required, just start a push task.
537 if not self.use_zip:
538 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000539 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 return
541
542 # If zipping is enabled, zip in a separate thread.
543 def zip_and_push():
544 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
545 # content right here. It will block until all file is zipped.
546 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
548 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 data = ''.join(stream)
550 except Exception as exc:
551 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800552 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000553 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000554 self.net_thread_pool.add_task_with_channel(
555 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000556 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000557
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000558 def async_fetch(self, channel, priority, digest, size, sink):
559 """Starts asynchronous fetch from the server in a parallel thread.
560
561 Arguments:
562 channel: TaskChannel that receives back |digest| when download ends.
563 priority: thread pool task priority for the fetch.
564 digest: hex digest of an item to download.
565 size: expected size of the item (after decompression).
566 sink: function that will be called as sink(generator).
567 """
568 def fetch():
569 try:
570 # Prepare reading pipeline.
571 stream = self._storage_api.fetch(digest)
572 if self.use_zip:
573 stream = zip_decompress(stream, DISK_FILE_CHUNK)
574 # Run |stream| through verifier that will assert its size.
575 verifier = FetchStreamVerifier(stream, size)
576 # Verified stream goes to |sink|.
577 sink(verifier.run())
578 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800579 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000580 raise
581 return digest
582
583 # Don't bother with zip_thread_pool for decompression. Decompression is
584 # really fast and most probably IO bound anyway.
585 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
586
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 def get_missing_items(self, items):
588 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000589
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000590 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000591
592 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000593 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594
595 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000596 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000597 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 channel = threading_utils.TaskChannel()
599 pending = 0
600 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000601 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000602 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
603 self._storage_api.contains, batch)
604 pending += 1
605 # Yield results as they come in.
606 for _ in xrange(pending):
607 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000608 yield missing
609
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000610 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000611 def batch_items_for_check(items):
612 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613
614 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
617 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000618 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000619
620 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000621 Batches of items to query for existence in a single operation,
622 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623 """
624 batch_count = 0
625 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
626 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000627 for item in sorted(items, key=lambda x: x.size, reverse=True):
628 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000629 if len(next_queries) == batch_size_limit:
630 yield next_queries
631 next_queries = []
632 batch_count += 1
633 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
634 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
635 if next_queries:
636 yield next_queries
637
638
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000639class FetchQueue(object):
640 """Fetches items from Storage and places them into LocalCache.
641
642 It manages multiple concurrent fetch operations. Acts as a bridge between
643 Storage and LocalCache so that Storage and LocalCache don't depend on each
644 other at all.
645 """
646
647 def __init__(self, storage, cache):
648 self.storage = storage
649 self.cache = cache
650 self._channel = threading_utils.TaskChannel()
651 self._pending = set()
652 self._accessed = set()
653 self._fetched = cache.cached_set()
654
655 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
656 """Starts asynchronous fetch of item |digest|."""
657 # Fetching it now?
658 if digest in self._pending:
659 return
660
661 # Mark this file as in use, verify_all_cached will later ensure it is still
662 # in cache.
663 self._accessed.add(digest)
664
665 # Already fetched? Notify cache to update item's LRU position.
666 if digest in self._fetched:
667 # 'touch' returns True if item is in cache and not corrupted.
668 if self.cache.touch(digest, size):
669 return
670 # Item is corrupted, remove it from cache and fetch it again.
671 self._fetched.remove(digest)
672 self.cache.evict(digest)
673
674 # TODO(maruel): It should look at the free disk space, the current cache
675 # size and the size of the new item on every new item:
676 # - Trim the cache as more entries are listed when free disk space is low,
677 # otherwise if the amount of data downloaded during the run > free disk
678 # space, it'll crash.
679 # - Make sure there's enough free disk space to fit all dependencies of
680 # this run! If not, abort early.
681
682 # Start fetching.
683 self._pending.add(digest)
684 self.storage.async_fetch(
685 self._channel, priority, digest, size,
686 functools.partial(self.cache.write, digest))
687
688 def wait(self, digests):
689 """Starts a loop that waits for at least one of |digests| to be retrieved.
690
691 Returns the first digest retrieved.
692 """
693 # Flush any already fetched items.
694 for digest in digests:
695 if digest in self._fetched:
696 return digest
697
698 # Ensure all requested items are being fetched now.
699 assert all(digest in self._pending for digest in digests), (
700 digests, self._pending)
701
702 # Wait for some requested item to finish fetching.
703 while self._pending:
704 digest = self._channel.pull()
705 self._pending.remove(digest)
706 self._fetched.add(digest)
707 if digest in digests:
708 return digest
709
710 # Should never reach this point due to assert above.
711 raise RuntimeError('Impossible state')
712
713 def inject_local_file(self, path, algo):
714 """Adds local file to the cache as if it was fetched from storage."""
715 with open(path, 'rb') as f:
716 data = f.read()
717 digest = algo(data).hexdigest()
718 self.cache.write(digest, [data])
719 self._fetched.add(digest)
720 return digest
721
722 @property
723 def pending_count(self):
724 """Returns number of items to be fetched."""
725 return len(self._pending)
726
727 def verify_all_cached(self):
728 """True if all accessed items are in cache."""
729 return self._accessed.issubset(self.cache.cached_set())
730
731
732class FetchStreamVerifier(object):
733 """Verifies that fetched file is valid before passing it to the LocalCache."""
734
735 def __init__(self, stream, expected_size):
736 self.stream = stream
737 self.expected_size = expected_size
738 self.current_size = 0
739
740 def run(self):
741 """Generator that yields same items as |stream|.
742
743 Verifies |stream| is complete before yielding a last chunk to consumer.
744
745 Also wraps IOError produced by consumer into MappingError exceptions since
746 otherwise Storage will retry fetch on unrelated local cache errors.
747 """
748 # Read one chunk ahead, keep it in |stored|.
749 # That way a complete stream can be verified before pushing last chunk
750 # to consumer.
751 stored = None
752 for chunk in self.stream:
753 assert chunk is not None
754 if stored is not None:
755 self._inspect_chunk(stored, is_last=False)
756 try:
757 yield stored
758 except IOError as exc:
759 raise MappingError('Failed to store an item in cache: %s' % exc)
760 stored = chunk
761 if stored is not None:
762 self._inspect_chunk(stored, is_last=True)
763 try:
764 yield stored
765 except IOError as exc:
766 raise MappingError('Failed to store an item in cache: %s' % exc)
767
768 def _inspect_chunk(self, chunk, is_last):
769 """Called for each fetched chunk before passing it to consumer."""
770 self.current_size += len(chunk)
771 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
772 (self.expected_size != self.current_size)):
773 raise IOError('Incorrect file size: expected %d, got %d' % (
774 self.expected_size, self.current_size))
775
776
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000777class StorageApi(object):
778 """Interface for classes that implement low-level storage operations."""
779
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000780 def get_fetch_url(self, digest):
781 """Returns an URL that can be used to fetch an item with given digest.
782
783 Arguments:
784 digest: hex digest of item to fetch.
785
786 Returns:
787 An URL or None if the protocol doesn't support this.
788 """
789 raise NotImplementedError()
790
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800791 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000792 """Fetches an object and yields its content.
793
794 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000795 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800796 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000797
798 Yields:
799 Chunks of downloaded item (as str objects).
800 """
801 raise NotImplementedError()
802
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000803 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000804 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000805
806 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000807 item: Item object that holds information about an item being pushed.
808 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000809
810 Returns:
811 None.
812 """
813 raise NotImplementedError()
814
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 def contains(self, items):
816 """Checks for existence of given |items| on the server.
817
818 Mutates |items| by assigning opaque implement specific object to Item's
819 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000820
821 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000822 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000823
824 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000825 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000826 """
827 raise NotImplementedError()
828
829
Mike Frysinger27f03da2014-02-12 16:47:01 -0500830class _PushState(object):
831 """State needed to call .push(), to be stored in Item.push_state.
832
833 Note this needs to be a global class to support pickling.
834 """
835
836 def __init__(self, upload_url, finalize_url):
837 self.upload_url = upload_url
838 self.finalize_url = finalize_url
839 self.uploaded = False
840 self.finalized = False
841
842
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000843class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000844 """StorageApi implementation that downloads and uploads to Isolate Server.
845
846 It uploads and downloads directly from Google Storage whenever appropriate.
847 """
848
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000849 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000850 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000851 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000852 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000853 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000854 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000855 self._server_caps = None
856
857 @staticmethod
858 def _generate_handshake_request():
859 """Returns a dict to be sent as handshake request body."""
860 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
861 return {
862 'client_app_version': __version__,
863 'fetcher': True,
864 'protocol_version': ISOLATE_PROTOCOL_VERSION,
865 'pusher': True,
866 }
867
868 @staticmethod
869 def _validate_handshake_response(caps):
870 """Validates and normalizes handshake response."""
871 logging.info('Protocol version: %s', caps['protocol_version'])
872 logging.info('Server version: %s', caps['server_app_version'])
873 if caps.get('error'):
874 raise MappingError(caps['error'])
875 if not caps['access_token']:
876 raise ValueError('access_token is missing')
877 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000878
879 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000880 def _server_capabilities(self):
881 """Performs handshake with the server if not yet done.
882
883 Returns:
884 Server capabilities dictionary as returned by /handshake endpoint.
885
886 Raises:
887 MappingError if server rejects the handshake.
888 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000889 # TODO(maruel): Make this request much earlier asynchronously while the
890 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000891 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000892 if self._server_caps is None:
893 request_body = json.dumps(
894 self._generate_handshake_request(), separators=(',', ':'))
895 response = net.url_read(
896 url=self.base_url + '/content-gs/handshake',
897 data=request_body,
898 content_type='application/json',
899 method='POST')
900 if response is None:
901 raise MappingError('Failed to perform handshake.')
902 try:
903 caps = json.loads(response)
904 if not isinstance(caps, dict):
905 raise ValueError('Expecting JSON dict')
906 self._server_caps = self._validate_handshake_response(caps)
907 except (ValueError, KeyError, TypeError) as exc:
908 # KeyError exception has very confusing str conversion: it's just a
909 # missing key value and nothing else. So print exception class name
910 # as well.
911 raise MappingError('Invalid handshake response (%s): %s' % (
912 exc.__class__.__name__, exc))
913 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000914
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000915 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000916 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000917 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000918 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000919
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800920 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000921 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800922 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000923
924 # Because the app engine DB is only eventually consistent, retry 404 errors
925 # because the file might just not be visible yet (even though it has been
926 # uploaded).
927 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800928 source_url,
929 retry_404=True,
930 read_timeout=DOWNLOAD_READ_TIMEOUT,
931 headers={'Range': 'bytes=%d-' % offset} if offset else None)
932
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000933 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800934 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800935
936 # If |offset| is used, verify server respects it by checking Content-Range.
937 if offset:
938 content_range = connection.get_header('Content-Range')
939 if not content_range:
940 raise IOError('Missing Content-Range header')
941
942 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
943 # According to a spec, <size> can be '*' meaning "Total size of the file
944 # is not known in advance".
945 try:
946 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
947 if not match:
948 raise ValueError()
949 content_offset = int(match.group(1))
950 last_byte_index = int(match.group(2))
951 size = None if match.group(3) == '*' else int(match.group(3))
952 except ValueError:
953 raise IOError('Invalid Content-Range header: %s' % content_range)
954
955 # Ensure returned offset equals requested one.
956 if offset != content_offset:
957 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
958 offset, content_offset, content_range))
959
960 # Ensure entire tail of the file is returned.
961 if size is not None and last_byte_index + 1 != size:
962 raise IOError('Incomplete response. Content-Range: %s' % content_range)
963
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000964 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000965
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000966 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000967 assert isinstance(item, Item)
Mike Frysinger27f03da2014-02-12 16:47:01 -0500968 assert isinstance(item.push_state, _PushState)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000969 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000970
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000971 # TODO(vadimsh): Do not read from |content| generator when retrying push.
972 # If |content| is indeed a generator, it can not be re-winded back
973 # to the beginning of the stream. A retry will find it exhausted. A possible
974 # solution is to wrap |content| generator with some sort of caching
975 # restartable generator. It should be done alongside streaming support
976 # implementation.
977
978 # This push operation may be a retry after failed finalization call below,
979 # no need to reupload contents in that case.
980 if not item.push_state.uploaded:
981 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
982 # upload support is implemented.
983 if isinstance(content, list) and len(content) == 1:
984 content = content[0]
985 else:
986 content = ''.join(content)
987 # PUT file to |upload_url|.
988 response = net.url_read(
989 url=item.push_state.upload_url,
990 data=content,
991 content_type='application/octet-stream',
992 method='PUT')
993 if response is None:
994 raise IOError('Failed to upload a file %s to %s' % (
995 item.digest, item.push_state.upload_url))
996 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000997 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000998 logging.info(
999 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001000
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001001 # Optionally notify the server that it's done.
1002 if item.push_state.finalize_url:
1003 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1004 # send it to isolated server. That way isolate server can verify that
1005 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1006 # stored files).
1007 response = net.url_read(
1008 url=item.push_state.finalize_url,
1009 data='',
1010 content_type='application/json',
1011 method='POST')
1012 if response is None:
1013 raise IOError('Failed to finalize an upload of %s' % item.digest)
1014 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001015
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001016 def contains(self, items):
1017 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001018
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001019 # Request body is a json encoded list of dicts.
1020 body = [
1021 {
1022 'h': item.digest,
1023 's': item.size,
1024 'i': int(item.is_isolated),
1025 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001026 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001027
1028 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1029 self.base_url,
1030 self.namespace,
1031 urllib.quote(self._server_capabilities['access_token']))
1032 response_body = net.url_read(
1033 url=query_url,
1034 data=json.dumps(body, separators=(',', ':')),
1035 content_type='application/json',
1036 method='POST')
1037 if response_body is None:
1038 raise MappingError('Failed to execute /pre-upload query')
1039
1040 # Response body is a list of push_urls (or null if file is already present).
1041 try:
1042 response = json.loads(response_body)
1043 if not isinstance(response, list):
1044 raise ValueError('Expecting response with json-encoded list')
1045 if len(response) != len(items):
1046 raise ValueError(
1047 'Incorrect number of items in the list, expected %d, '
1048 'but got %d' % (len(items), len(response)))
1049 except ValueError as err:
1050 raise MappingError(
1051 'Invalid response from server: %s, body is %s' % (err, response_body))
1052
1053 # Pick Items that are missing, attach _PushState to them.
1054 missing_items = []
1055 for i, push_urls in enumerate(response):
1056 if push_urls:
1057 assert len(push_urls) == 2, str(push_urls)
1058 item = items[i]
1059 assert item.push_state is None
Mike Frysinger27f03da2014-02-12 16:47:01 -05001060 item.push_state = _PushState(push_urls[0], push_urls[1])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001061 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001062 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001063 len(items), len(items) - len(missing_items))
1064 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001065
1066
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001067class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001068 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001069
1070 The common use case is a NFS/CIFS file server that is mounted locally that is
1071 used to fetch the file on a local partition.
1072 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001073
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001074 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001075 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001076 self.base_path = base_path
1077
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001078 def get_fetch_url(self, digest):
1079 return None
1080
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001081 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001082 assert isinstance(digest, basestring)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001083 return file_read(os.path.join(self.base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001084
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001085 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001086 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001087 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001088
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001089 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001090 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001091 item for item in items
1092 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001093 ]
1094
1095
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001096class LocalCache(object):
1097 """Local cache that stores objects fetched via Storage.
1098
1099 It can be accessed concurrently from multiple threads, so it should protect
1100 its internal state with some lock.
1101 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001102 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001103
1104 def __enter__(self):
1105 """Context manager interface."""
1106 return self
1107
1108 def __exit__(self, _exc_type, _exec_value, _traceback):
1109 """Context manager interface."""
1110 return False
1111
1112 def cached_set(self):
1113 """Returns a set of all cached digests (always a new object)."""
1114 raise NotImplementedError()
1115
1116 def touch(self, digest, size):
1117 """Ensures item is not corrupted and updates its LRU position.
1118
1119 Arguments:
1120 digest: hash digest of item to check.
1121 size: expected size of this item.
1122
1123 Returns:
1124 True if item is in cache and not corrupted.
1125 """
1126 raise NotImplementedError()
1127
1128 def evict(self, digest):
1129 """Removes item from cache if it's there."""
1130 raise NotImplementedError()
1131
1132 def read(self, digest):
1133 """Returns contents of the cached item as a single str."""
1134 raise NotImplementedError()
1135
1136 def write(self, digest, content):
1137 """Reads data from |content| generator and stores it in cache."""
1138 raise NotImplementedError()
1139
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001140 def hardlink(self, digest, dest, file_mode):
1141 """Ensures file at |dest| has same content as cached |digest|.
1142
1143 If file_mode is provided, it is used to set the executable bit if
1144 applicable.
1145 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001146 raise NotImplementedError()
1147
1148
1149class MemoryCache(LocalCache):
1150 """LocalCache implementation that stores everything in memory."""
1151
1152 def __init__(self):
1153 super(MemoryCache, self).__init__()
1154 # Let's not assume dict is thread safe.
1155 self._lock = threading.Lock()
1156 self._contents = {}
1157
1158 def cached_set(self):
1159 with self._lock:
1160 return set(self._contents)
1161
1162 def touch(self, digest, size):
1163 with self._lock:
1164 return digest in self._contents
1165
1166 def evict(self, digest):
1167 with self._lock:
1168 self._contents.pop(digest, None)
1169
1170 def read(self, digest):
1171 with self._lock:
1172 return self._contents[digest]
1173
1174 def write(self, digest, content):
1175 # Assemble whole stream before taking the lock.
1176 data = ''.join(content)
1177 with self._lock:
1178 self._contents[digest] = data
1179
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001180 def hardlink(self, digest, dest, file_mode):
1181 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001182 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001183 if file_mode is not None:
1184 # Ignores all other bits.
1185 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001186
1187
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001188def get_hash_algo(_namespace):
1189 """Return hash algorithm class to use when uploading to given |namespace|."""
1190 # TODO(vadimsh): Implement this at some point.
1191 return hashlib.sha1
1192
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001193
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001194def is_namespace_with_compression(namespace):
1195 """Returns True if given |namespace| stores compressed objects."""
1196 return namespace.endswith(('-gzip', '-deflate'))
1197
1198
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001199def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001200 """Returns an object that implements StorageApi interface."""
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001201 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001202 return IsolateServer(file_or_url, namespace)
1203 else:
1204 return FileSystem(file_or_url)
1205
1206
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001207def get_storage(file_or_url, namespace):
1208 """Returns Storage class configured with appropriate StorageApi instance."""
1209 return Storage(
1210 get_storage_api(file_or_url, namespace),
1211 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001212
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001213
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001214def expand_symlinks(indir, relfile):
1215 """Follows symlinks in |relfile|, but treating symlinks that point outside the
1216 build tree as if they were ordinary directories/files. Returns the final
1217 symlink-free target and a list of paths to symlinks encountered in the
1218 process.
1219
1220 The rule about symlinks outside the build tree is for the benefit of the
1221 Chromium OS ebuild, which symlinks the output directory to an unrelated path
1222 in the chroot.
1223
1224 Fails when a directory loop is detected, although in theory we could support
1225 that case.
1226 """
1227 is_directory = relfile.endswith(os.path.sep)
1228 done = indir
1229 todo = relfile.strip(os.path.sep)
1230 symlinks = []
1231
1232 while todo:
1233 pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1234 done, todo)
1235 if not symlink:
1236 todo = file_path.fix_native_path_case(done, todo)
1237 done = os.path.join(done, todo)
1238 break
1239 symlink_path = os.path.join(done, pre_symlink, symlink)
1240 post_symlink = post_symlink.lstrip(os.path.sep)
1241 # readlink doesn't exist on Windows.
1242 # pylint: disable=E1101
1243 target = os.path.normpath(os.path.join(done, pre_symlink))
1244 symlink_target = os.readlink(symlink_path)
1245 if os.path.isabs(symlink_target):
1246 # Absolute path are considered a normal directories. The use case is
1247 # generally someone who puts the output directory on a separate drive.
1248 target = symlink_target
1249 else:
1250 # The symlink itself could be using the wrong path case.
1251 target = file_path.fix_native_path_case(target, symlink_target)
1252
1253 if not os.path.exists(target):
1254 raise MappingError(
1255 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1256 target = file_path.get_native_path_case(target)
1257 if not file_path.path_starts_with(indir, target):
1258 done = symlink_path
1259 todo = post_symlink
1260 continue
1261 if file_path.path_starts_with(target, symlink_path):
1262 raise MappingError(
1263 'Can\'t map recursive symlink reference %s -> %s' %
1264 (symlink_path, target))
1265 logging.info('Found symlink: %s -> %s', symlink_path, target)
1266 symlinks.append(os.path.relpath(symlink_path, indir))
1267 # Treat the common prefix of the old and new paths as done, and start
1268 # scanning again.
1269 target = target.split(os.path.sep)
1270 symlink_path = symlink_path.split(os.path.sep)
1271 prefix_length = 0
1272 for target_piece, symlink_path_piece in zip(target, symlink_path):
1273 if target_piece == symlink_path_piece:
1274 prefix_length += 1
1275 else:
1276 break
1277 done = os.path.sep.join(target[:prefix_length])
1278 todo = os.path.join(
1279 os.path.sep.join(target[prefix_length:]), post_symlink)
1280
1281 relfile = os.path.relpath(done, indir)
1282 relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1283 return relfile, symlinks
1284
1285
1286def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1287 """Expands a single input. It can result in multiple outputs.
1288
1289 This function is recursive when relfile is a directory.
1290
1291 Note: this code doesn't properly handle recursive symlink like one created
1292 with:
1293 ln -s .. foo
1294 """
1295 if os.path.isabs(relfile):
1296 raise MappingError('Can\'t map absolute path %s' % relfile)
1297
1298 infile = file_path.normpath(os.path.join(indir, relfile))
1299 if not infile.startswith(indir):
1300 raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1301
1302 filepath = os.path.join(indir, relfile)
1303 native_filepath = file_path.get_native_path_case(filepath)
1304 if filepath != native_filepath:
1305 # Special case './'.
1306 if filepath != native_filepath + '.' + os.path.sep:
1307 # Give up enforcing strict path case on OSX. Really, it's that sad. The
1308 # case where it happens is very specific and hard to reproduce:
1309 # get_native_path_case(
1310 # u'Foo.framework/Versions/A/Resources/Something.nib') will return
1311 # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1312 #
1313 # Note that this is really something deep in OSX because running
1314 # ls Foo.framework/Versions/A
1315 # will print out 'Resources', while file_path.get_native_path_case()
1316 # returns a lower case 'r'.
1317 #
1318 # So *something* is happening under the hood resulting in the command 'ls'
1319 # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
1320 # have no idea why.
1321 if sys.platform != 'darwin':
1322 raise MappingError(
1323 'File path doesn\'t equal native file path\n%s != %s' %
1324 (filepath, native_filepath))
1325
1326 symlinks = []
1327 if follow_symlinks:
1328 relfile, symlinks = expand_symlinks(indir, relfile)
1329
1330 if relfile.endswith(os.path.sep):
1331 if not os.path.isdir(infile):
1332 raise MappingError(
1333 '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1334
1335 # Special case './'.
1336 if relfile.startswith('.' + os.path.sep):
1337 relfile = relfile[2:]
1338 outfiles = symlinks
1339 try:
1340 for filename in os.listdir(infile):
1341 inner_relfile = os.path.join(relfile, filename)
1342 if blacklist and blacklist(inner_relfile):
1343 continue
1344 if os.path.isdir(os.path.join(indir, inner_relfile)):
1345 inner_relfile += os.path.sep
1346 outfiles.extend(
1347 expand_directory_and_symlink(indir, inner_relfile, blacklist,
1348 follow_symlinks))
1349 return outfiles
1350 except OSError as e:
1351 raise MappingError(
1352 'Unable to iterate over directory %s.\n%s' % (infile, e))
1353 else:
1354 # Always add individual files even if they were blacklisted.
1355 if os.path.isdir(infile):
1356 raise MappingError(
1357 'Input directory %s must have a trailing slash' % infile)
1358
1359 if not os.path.isfile(infile):
1360 raise MappingError('Input file %s doesn\'t exist' % infile)
1361
1362 return symlinks + [relfile]
1363
1364
1365def process_input(filepath, prevdict, read_only, flavor, algo):
1366 """Processes an input file, a dependency, and return meta data about it.
1367
1368 Behaviors:
1369 - Retrieves the file mode, file size, file timestamp, file link
1370 destination if it is a file link and calcultate the SHA-1 of the file's
1371 content if the path points to a file and not a symlink.
1372
1373 Arguments:
1374 filepath: File to act on.
1375 prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1376 to skip recalculating the hash. Optional.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001377 read_only: If 1 or 2, the file mode is manipulated. In practice, only save
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001378 one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1379 windows, mode is not set since all files are 'executable' by
1380 default.
1381 flavor: One isolated flavor, like 'linux', 'mac' or 'win'.
1382 algo: Hashing algorithm used.
1383
1384 Returns:
1385 The necessary data to create a entry in the 'files' section of an .isolated
1386 file.
1387 """
1388 out = {}
1389 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1390 # if prevdict.get('T') == True:
1391 # # The file's content is ignored. Skip the time and hard code mode.
1392 # if get_flavor() != 'win':
1393 # out['m'] = stat.S_IRUSR | stat.S_IRGRP
1394 # out['s'] = 0
1395 # out['h'] = algo().hexdigest()
1396 # out['T'] = True
1397 # return out
1398
1399 # Always check the file stat and check if it is a link. The timestamp is used
1400 # to know if the file's content/symlink destination should be looked into.
1401 # E.g. only reuse from prevdict if the timestamp hasn't changed.
1402 # There is the risk of the file's timestamp being reset to its last value
1403 # manually while its content changed. We don't protect against that use case.
1404 try:
1405 filestats = os.lstat(filepath)
1406 except OSError:
1407 # The file is not present.
1408 raise MappingError('%s is missing' % filepath)
1409 is_link = stat.S_ISLNK(filestats.st_mode)
1410
1411 if flavor != 'win':
1412 # Ignore file mode on Windows since it's not really useful there.
1413 filemode = stat.S_IMODE(filestats.st_mode)
1414 # Remove write access for group and all access to 'others'.
1415 filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1416 if read_only:
1417 filemode &= ~stat.S_IWUSR
1418 if filemode & stat.S_IXUSR:
1419 filemode |= stat.S_IXGRP
1420 else:
1421 filemode &= ~stat.S_IXGRP
1422 if not is_link:
1423 out['m'] = filemode
1424
1425 # Used to skip recalculating the hash or link destination. Use the most recent
1426 # update time.
1427 # TODO(maruel): Save it in the .state file instead of .isolated so the
1428 # .isolated file is deterministic.
1429 out['t'] = int(round(filestats.st_mtime))
1430
1431 if not is_link:
1432 out['s'] = filestats.st_size
1433 # If the timestamp wasn't updated and the file size is still the same, carry
1434 # on the sha-1.
1435 if (prevdict.get('t') == out['t'] and
1436 prevdict.get('s') == out['s']):
1437 # Reuse the previous hash if available.
1438 out['h'] = prevdict.get('h')
1439 if not out.get('h'):
1440 out['h'] = hash_file(filepath, algo)
1441 else:
1442 # If the timestamp wasn't updated, carry on the link destination.
1443 if prevdict.get('t') == out['t']:
1444 # Reuse the previous link destination if available.
1445 out['l'] = prevdict.get('l')
1446 if out.get('l') is None:
1447 # The link could be in an incorrect path case. In practice, this only
1448 # happen on OSX on case insensitive HFS.
1449 # TODO(maruel): It'd be better if it was only done once, in
1450 # expand_directory_and_symlink(), so it would not be necessary to do again
1451 # here.
1452 symlink_value = os.readlink(filepath) # pylint: disable=E1101
1453 filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1454 native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1455 out['l'] = os.path.relpath(native_dest, filedir)
1456 return out
1457
1458
1459def save_isolated(isolated, data):
1460 """Writes one or multiple .isolated files.
1461
1462 Note: this reference implementation does not create child .isolated file so it
1463 always returns an empty list.
1464
1465 Returns the list of child isolated files that are included by |isolated|.
1466 """
1467 # Make sure the data is valid .isolated data by 'reloading' it.
1468 algo = SUPPORTED_ALGOS[data['algo']]
1469 load_isolated(json.dumps(data), data.get('flavor'), algo)
1470 tools.write_json(isolated, data, True)
1471 return []
1472
1473
1474
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001475def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001476 """Uploads the given tree to the given url.
1477
1478 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001479 base_url: The base url, it is assume that |base_url|/has/ can be used to
1480 query if an element was already uploaded, and |base_url|/store/
1481 can be used to upload a new element.
1482 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001483 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001484 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001485 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001486 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001487 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001488 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001489
1490
maruel@chromium.org41601642013-09-18 19:40:46 +00001491def load_isolated(content, os_flavor, algo):
1492 """Verifies the .isolated file is valid and loads this object with the json
1493 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001494
1495 Arguments:
1496 - content: raw serialized content to load.
1497 - os_flavor: OS to load this file on. Optional.
1498 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1499 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001500 """
1501 try:
1502 data = json.loads(content)
1503 except ValueError:
1504 raise ConfigError('Failed to parse: %s...' % content[:100])
1505
1506 if not isinstance(data, dict):
1507 raise ConfigError('Expected dict, got %r' % data)
1508
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001509 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001510 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001511 value = data.get('version', ISOLATED_FILE_VERSION)
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001512 if not isinstance(value, basestring):
1513 raise ConfigError('Expected string, got %r' % value)
1514 if not re.match(r'^(\d+)\.(\d+)$', value):
1515 raise ConfigError('Expected a compatible version, got %r' % value)
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001516 if value.split('.', 1)[0] != ISOLATED_FILE_VERSION.split('.', 1)[0]:
1517 raise ConfigError(
1518 'Expected compatible \'%s\' version, got %r' %
1519 (ISOLATED_FILE_VERSION, value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001520
1521 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001522 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001523 # Default the algorithm used in the .isolated file itself, falls back to
1524 # 'sha-1' if unspecified.
1525 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1526
maruel@chromium.org41601642013-09-18 19:40:46 +00001527 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001528 if key == 'algo':
1529 if not isinstance(value, basestring):
1530 raise ConfigError('Expected string, got %r' % value)
1531 if value not in SUPPORTED_ALGOS:
1532 raise ConfigError(
1533 'Expected one of \'%s\', got %r' %
1534 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1535 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1536 raise ConfigError(
1537 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1538
1539 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001540 if not isinstance(value, list):
1541 raise ConfigError('Expected list, got %r' % value)
1542 if not value:
1543 raise ConfigError('Expected non-empty command')
1544 for subvalue in value:
1545 if not isinstance(subvalue, basestring):
1546 raise ConfigError('Expected string, got %r' % subvalue)
1547
1548 elif key == 'files':
1549 if not isinstance(value, dict):
1550 raise ConfigError('Expected dict, got %r' % value)
1551 for subkey, subvalue in value.iteritems():
1552 if not isinstance(subkey, basestring):
1553 raise ConfigError('Expected string, got %r' % subkey)
1554 if not isinstance(subvalue, dict):
1555 raise ConfigError('Expected dict, got %r' % subvalue)
1556 for subsubkey, subsubvalue in subvalue.iteritems():
1557 if subsubkey == 'l':
1558 if not isinstance(subsubvalue, basestring):
1559 raise ConfigError('Expected string, got %r' % subsubvalue)
1560 elif subsubkey == 'm':
1561 if not isinstance(subsubvalue, int):
1562 raise ConfigError('Expected int, got %r' % subsubvalue)
1563 elif subsubkey == 'h':
1564 if not is_valid_hash(subsubvalue, algo):
1565 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1566 elif subsubkey == 's':
Marc-Antoine Ruelaab3a622013-11-28 09:47:05 -05001567 if not isinstance(subsubvalue, (int, long)):
1568 raise ConfigError('Expected int or long, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001569 else:
1570 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001571 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001572 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001573 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1574 subvalue)
1575 if bool('h' in subvalue) != bool('s' in subvalue):
1576 raise ConfigError(
1577 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1578 subvalue)
1579 if bool('s' in subvalue) == bool('l' in subvalue):
1580 raise ConfigError(
1581 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1582 subvalue)
1583 if bool('l' in subvalue) and bool('m' in subvalue):
1584 raise ConfigError(
1585 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001586 subvalue)
1587
1588 elif key == 'includes':
1589 if not isinstance(value, list):
1590 raise ConfigError('Expected list, got %r' % value)
1591 if not value:
1592 raise ConfigError('Expected non-empty includes list')
1593 for subvalue in value:
1594 if not is_valid_hash(subvalue, algo):
1595 raise ConfigError('Expected sha-1, got %r' % subvalue)
1596
1597 elif key == 'read_only':
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001598 if not value in (0, 1, 2):
1599 raise ConfigError('Expected 0, 1 or 2, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001600
1601 elif key == 'relative_cwd':
1602 if not isinstance(value, basestring):
1603 raise ConfigError('Expected string, got %r' % value)
1604
1605 elif key == 'os':
1606 if os_flavor and value != os_flavor:
1607 raise ConfigError(
1608 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1609 (os_flavor, value))
1610
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001611 elif key == 'version':
1612 # Already checked above.
1613 pass
1614
maruel@chromium.org41601642013-09-18 19:40:46 +00001615 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001616 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001617
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001618 # Automatically fix os.path.sep if necessary. While .isolated files are always
1619 # in the the native path format, someone could want to download an .isolated
1620 # tree from another OS.
1621 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1622 if 'files' in data:
1623 data['files'] = dict(
1624 (k.replace(wrong_path_sep, os.path.sep), v)
1625 for k, v in data['files'].iteritems())
1626 for v in data['files'].itervalues():
1627 if 'l' in v:
1628 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1629 if 'relative_cwd' in data:
1630 data['relative_cwd'] = data['relative_cwd'].replace(
1631 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001632 return data
1633
1634
1635class IsolatedFile(object):
1636 """Represents a single parsed .isolated file."""
1637 def __init__(self, obj_hash, algo):
1638 """|obj_hash| is really the sha-1 of the file."""
1639 logging.debug('IsolatedFile(%s)' % obj_hash)
1640 self.obj_hash = obj_hash
1641 self.algo = algo
1642 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1643 # .isolate and all the .isolated files recursively included by it with
1644 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1645 # .isolated file in the hash table, is important, as the later ones are not
1646 # processed until the firsts are retrieved and read.
1647 self.can_fetch = False
1648
1649 # Raw data.
1650 self.data = {}
1651 # A IsolatedFile instance, one per object in self.includes.
1652 self.children = []
1653
1654 # Set once the .isolated file is loaded.
1655 self._is_parsed = False
1656 # Set once the files are fetched.
1657 self.files_fetched = False
1658
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001659 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001660 """Verifies the .isolated file is valid and loads this object with the json
1661 data.
1662 """
1663 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1664 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001665 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001666 self.children = [
1667 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1668 ]
1669 self._is_parsed = True
1670
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001671 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001672 """Adds files in this .isolated file not present in |files| dictionary.
1673
1674 Preemptively request files.
1675
1676 Note that |files| is modified by this function.
1677 """
1678 assert self.can_fetch
1679 if not self._is_parsed or self.files_fetched:
1680 return
1681 logging.debug('fetch_files(%s)' % self.obj_hash)
1682 for filepath, properties in self.data.get('files', {}).iteritems():
1683 # Root isolated has priority on the files being mapped. In particular,
1684 # overriden files must not be fetched.
1685 if filepath not in files:
1686 files[filepath] = properties
1687 if 'h' in properties:
1688 # Preemptively request files.
1689 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001690 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001691 self.files_fetched = True
1692
1693
1694class Settings(object):
1695 """Results of a completely parsed .isolated file."""
1696 def __init__(self):
1697 self.command = []
1698 self.files = {}
1699 self.read_only = None
1700 self.relative_cwd = None
1701 # The main .isolated file, a IsolatedFile instance.
1702 self.root = None
1703
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001704 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001705 """Loads the .isolated and all the included .isolated asynchronously.
1706
1707 It enables support for "included" .isolated files. They are processed in
1708 strict order but fetched asynchronously from the cache. This is important so
1709 that a file in an included .isolated file that is overridden by an embedding
1710 .isolated file is not fetched needlessly. The includes are fetched in one
1711 pass and the files are fetched as soon as all the ones on the left-side
1712 of the tree were fetched.
1713
1714 The prioritization is very important here for nested .isolated files.
1715 'includes' have the highest priority and the algorithm is optimized for both
1716 deep and wide trees. A deep one is a long link of .isolated files referenced
1717 one at a time by one item in 'includes'. A wide one has a large number of
1718 'includes' in a single .isolated file. 'left' is defined as an included
1719 .isolated file earlier in the 'includes' list. So the order of the elements
1720 in 'includes' is important.
1721 """
1722 self.root = IsolatedFile(root_isolated_hash, algo)
1723
1724 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1725 pending = {}
1726 # Set of hashes of already retrieved items to refuse recursive includes.
1727 seen = set()
1728
1729 def retrieve(isolated_file):
1730 h = isolated_file.obj_hash
1731 if h in seen:
1732 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1733 assert h not in pending
1734 seen.add(h)
1735 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001736 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001737
1738 retrieve(self.root)
1739
1740 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001741 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001742 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001743 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001744 if item_hash == root_isolated_hash:
1745 # It's the root item.
1746 item.can_fetch = True
1747
1748 for new_child in item.children:
1749 retrieve(new_child)
1750
1751 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001752 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001753
1754 def check(n):
1755 return all(check(x) for x in n.children) and n.files_fetched
1756 assert check(self.root)
1757
1758 self.relative_cwd = self.relative_cwd or ''
maruel@chromium.org41601642013-09-18 19:40:46 +00001759
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001760 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001761 if node.can_fetch:
1762 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001763 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001764 will_break = False
1765 for i in node.children:
1766 if not i.can_fetch:
1767 if will_break:
1768 break
1769 # Automatically mark the first one as fetcheable.
1770 i.can_fetch = True
1771 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001772 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001773
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001774 def _update_self(self, fetch_queue, node):
1775 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001776 # Grabs properties.
1777 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001778 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001779 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001780 if self.command:
1781 self.command[0] = self.command[0].replace('/', os.path.sep)
1782 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001783 if self.read_only is None and node.data.get('read_only') is not None:
1784 self.read_only = node.data['read_only']
1785 if (self.relative_cwd is None and
1786 node.data.get('relative_cwd') is not None):
1787 self.relative_cwd = node.data['relative_cwd']
1788
1789
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001790def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001791 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001792 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001793
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001794 Arguments:
1795 isolated_hash: hash of the root *.isolated file.
1796 storage: Storage class that communicates with isolate storage.
1797 cache: LocalCache class that knows how to store and map files locally.
1798 algo: hash algorithm to use.
1799 outdir: Output directory to map file tree to.
1800 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1801 require_command: Ensure *.isolated specifies a command to run.
1802
1803 Returns:
1804 Settings object that holds details about loaded *.isolated file.
1805 """
1806 with cache:
1807 fetch_queue = FetchQueue(storage, cache)
1808 settings = Settings()
1809
1810 with tools.Profiler('GetIsolateds'):
1811 # Optionally support local files by manually adding them to cache.
1812 if not is_valid_hash(isolated_hash, algo):
1813 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1814
1815 # Load all *.isolated and start loading rest of the files.
1816 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001817 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001818 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1819 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001820 raise ConfigError('No command to run')
1821
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001822 with tools.Profiler('GetRest'):
1823 # Create file system hierarchy.
1824 if not os.path.isdir(outdir):
1825 os.makedirs(outdir)
1826 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001827 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001828
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001829 # Ensure working directory exists.
1830 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1831 if not os.path.isdir(cwd):
1832 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001833
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001834 # Multimap: digest -> list of pairs (path, props).
1835 remaining = {}
1836 for filepath, props in settings.files.iteritems():
1837 if 'h' in props:
1838 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001839
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001840 # Now block on the remaining files to be downloaded and mapped.
1841 logging.info('Retrieving remaining files (%d of them)...',
1842 fetch_queue.pending_count)
1843 last_update = time.time()
1844 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1845 while remaining:
1846 detector.ping()
1847
1848 # Wait for any item to finish fetching to cache.
1849 digest = fetch_queue.wait(remaining)
1850
1851 # Link corresponding files to a fetched item in cache.
1852 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001853 cache.hardlink(
1854 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001855
1856 # Report progress.
1857 duration = time.time() - last_update
1858 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1859 msg = '%d files remaining...' % len(remaining)
1860 print msg
1861 logging.info(msg)
1862 last_update = time.time()
1863
1864 # Cache could evict some items we just tried to fetch, it's a fatal error.
1865 if not fetch_queue.verify_all_cached():
1866 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001867 return settings
1868
1869
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001870def directory_to_metadata(root, algo, blacklist):
1871 """Returns the FileItem list and .isolated metadata for a directory."""
1872 root = file_path.get_native_path_case(root)
1873 metadata = dict(
1874 (relpath, process_input(
1875 os.path.join(root, relpath), {}, False, sys.platform, algo))
1876 for relpath in expand_directory_and_symlink(
1877 root, './', blacklist, True)
1878 )
1879 for v in metadata.itervalues():
1880 v.pop('t')
1881 items = [
1882 FileItem(
1883 path=os.path.join(root, relpath),
1884 digest=meta['h'],
1885 size=meta['s'],
1886 is_isolated=relpath.endswith('.isolated'))
1887 for relpath, meta in metadata.iteritems() if 'h' in meta
1888 ]
1889 return items, metadata
1890
1891
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001892def archive_files_to_storage(storage, algo, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001893 """Stores every entries and returns the relevant data.
1894
1895 Arguments:
1896 storage: a Storage object that communicates with the remote object store.
1897 algo: an hashlib class to hash content. Usually hashlib.sha1.
1898 files: list of file paths to upload. If a directory is specified, a
1899 .isolated file is created and its hash is returned.
1900 blacklist: function that returns True if a file should be omitted.
1901 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001902 assert all(isinstance(i, unicode) for i in files), files
1903 if len(files) != len(set(map(os.path.abspath, files))):
1904 raise Error('Duplicate entries found.')
1905
1906 results = []
1907 # The temporary directory is only created as needed.
1908 tempdir = None
1909 try:
1910 # TODO(maruel): Yield the files to a worker thread.
1911 items_to_upload = []
1912 for f in files:
1913 try:
1914 filepath = os.path.abspath(f)
1915 if os.path.isdir(filepath):
1916 # Uploading a whole directory.
1917 items, metadata = directory_to_metadata(filepath, algo, blacklist)
1918
1919 # Create the .isolated file.
1920 if not tempdir:
1921 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1922 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1923 os.close(handle)
1924 data = {
1925 'algo': SUPPORTED_ALGOS_REVERSE[algo],
1926 'files': metadata,
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001927 'version': ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001928 }
1929 save_isolated(isolated, data)
1930 h = hash_file(isolated, algo)
1931 items_to_upload.extend(items)
1932 items_to_upload.append(
1933 FileItem(
1934 path=isolated,
1935 digest=h,
1936 size=os.stat(isolated).st_size,
1937 is_isolated=True))
1938 results.append((h, f))
1939
1940 elif os.path.isfile(filepath):
1941 h = hash_file(filepath, algo)
1942 items_to_upload.append(
1943 FileItem(
1944 path=filepath,
1945 digest=h,
1946 size=os.stat(filepath).st_size,
1947 is_isolated=f.endswith('.isolated')))
1948 results.append((h, f))
1949 else:
1950 raise Error('%s is neither a file or directory.' % f)
1951 except OSError:
1952 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001953 # Technically we would care about which files were uploaded but we don't
1954 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001955 _uploaded_files = storage.upload_items(items_to_upload)
1956 return results
1957 finally:
1958 if tempdir:
1959 shutil.rmtree(tempdir)
1960
1961
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001962def archive(out, namespace, files, blacklist):
1963 if files == ['-']:
1964 files = sys.stdin.readlines()
1965
1966 if not files:
1967 raise Error('Nothing to upload')
1968
1969 files = [f.decode('utf-8') for f in files]
1970 algo = get_hash_algo(namespace)
1971 blacklist = tools.gen_blacklist(blacklist)
1972 with get_storage(out, namespace) as storage:
1973 results = archive_files_to_storage(storage, algo, files, blacklist)
1974 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1975
1976
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001977@subcommand.usage('<file1..fileN> or - to read from stdin')
1978def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001979 """Archives data to the server.
1980
1981 If a directory is specified, a .isolated file is created the whole directory
1982 is uploaded. Then this .isolated file can be included in another one to run
1983 commands.
1984
1985 The commands output each file that was processed with its content hash. For
1986 directories, the .isolated generated for the directory is listed as the
1987 directory entry itself.
1988 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001989 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001990 parser.add_option(
1991 '--blacklist',
1992 action='append', default=list(DEFAULT_BLACKLIST),
1993 help='List of regexp to use as blacklist filter when uploading '
1994 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001995 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001996 process_isolate_server_options(parser, options)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001997 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001998 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001999 except Error as e:
2000 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002001 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002002
2003
2004def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002005 """Download data from the server.
2006
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002007 It can either download individual files or a complete tree from a .isolated
2008 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002009 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002010 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002011 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002012 '-i', '--isolated', metavar='HASH',
2013 help='hash of an isolated file, .isolated file content is discarded, use '
2014 '--file if you need it')
2015 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002016 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2017 help='hash and destination of a file, can be used multiple times')
2018 parser.add_option(
2019 '-t', '--target', metavar='DIR', default=os.getcwd(),
2020 help='destination directory')
2021 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002022 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002023 if args:
2024 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002025 if bool(options.isolated) == bool(options.file):
2026 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002027
2028 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002029
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002030 remote = options.isolate_server or options.indir
2031 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002032 # Fetching individual files.
2033 if options.file:
2034 channel = threading_utils.TaskChannel()
2035 pending = {}
2036 for digest, dest in options.file:
2037 pending[digest] = dest
2038 storage.async_fetch(
2039 channel,
2040 WorkerPool.MED,
2041 digest,
2042 UNKNOWN_FILE_SIZE,
2043 functools.partial(file_write, os.path.join(options.target, dest)))
2044 while pending:
2045 fetched = channel.pull()
2046 dest = pending.pop(fetched)
2047 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002048
Vadim Shtayura3172be52013-12-03 12:49:05 -08002049 # Fetching whole isolated tree.
2050 if options.isolated:
2051 settings = fetch_isolated(
2052 isolated_hash=options.isolated,
2053 storage=storage,
2054 cache=MemoryCache(),
2055 algo=get_hash_algo(options.namespace),
2056 outdir=options.target,
2057 os_flavor=None,
2058 require_command=False)
2059 rel = os.path.join(options.target, settings.relative_cwd)
2060 print('To run this test please run from the directory %s:' %
2061 os.path.join(options.target, rel))
2062 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002063
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002064 return 0
2065
2066
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002067@subcommand.usage('<file1..fileN> or - to read from stdin')
2068def CMDhashtable(parser, args):
2069 """Archives data to a hashtable on the file system.
2070
2071 If a directory is specified, a .isolated file is created the whole directory
2072 is uploaded. Then this .isolated file can be included in another one to run
2073 commands.
2074
2075 The commands output each file that was processed with its content hash. For
2076 directories, the .isolated generated for the directory is listed as the
2077 directory entry itself.
2078 """
2079 add_outdir_options(parser)
2080 parser.add_option(
2081 '--blacklist',
2082 action='append', default=list(DEFAULT_BLACKLIST),
2083 help='List of regexp to use as blacklist filter when uploading '
2084 'directories')
2085 options, files = parser.parse_args(args)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002086 process_outdir_options(parser, options, os.getcwd())
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002087 try:
2088 # Do not compress files when archiving to the file system.
2089 archive(options.outdir, 'default', files, options.blacklist)
2090 except Error as e:
2091 parser.error(e.args[0])
2092 return 0
2093
2094
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002095def add_isolate_server_options(parser, add_indir):
2096 """Adds --isolate-server and --namespace options to parser.
2097
2098 Includes --indir if desired.
2099 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002100 parser.add_option(
2101 '-I', '--isolate-server',
2102 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002103 help='URL of the Isolate Server to use. Defaults to the environment '
2104 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2105 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002106 parser.add_option(
2107 '--namespace', default='default-gzip',
2108 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002109 if add_indir:
2110 parser.add_option(
2111 '--indir', metavar='DIR',
2112 help='Directory used to store the hashtable instead of using an '
2113 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002114
2115
2116def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002117 """Processes the --isolate-server and --indir options and aborts if neither is
2118 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002119 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002120 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002121 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002122 if not has_indir:
2123 parser.error('--isolate-server is required.')
2124 elif not options.indir:
2125 parser.error('Use one of --indir or --isolate-server.')
2126 else:
2127 if has_indir and options.indir:
2128 parser.error('Use only one of --indir or --isolate-server.')
2129
2130 if options.isolate_server:
2131 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002132 if parts.query:
2133 parser.error('--isolate-server doesn\'t support query parameter.')
2134 if parts.fragment:
2135 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002136 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2137 # what is desired here.
2138 new = list(parts)
2139 if not new[1] and new[2]:
2140 new[1] = new[2].rstrip('/')
2141 new[2] = ''
2142 new[2] = new[2].rstrip('/')
2143 options.isolate_server = urlparse.urlunparse(new)
2144 return
2145
2146 if file_path.is_url(options.indir):
2147 parser.error('Can\'t use an URL for --indir.')
2148 options.indir = unicode(options.indir).replace('/', os.path.sep)
2149 options.indir = os.path.abspath(
2150 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2151 if not os.path.isdir(options.indir):
2152 parser.error('Path given to --indir must exist.')
2153
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002154
2155
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002156def add_outdir_options(parser):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002157 """Adds --outdir, which is orthogonal to --isolate-server.
2158
2159 Note: On upload, separate commands are used between 'archive' and 'hashtable'.
2160 On 'download', the same command can download from either an isolate server or
2161 a file system.
2162 """
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002163 parser.add_option(
2164 '-o', '--outdir', metavar='DIR',
2165 help='Directory used to recreate the tree.')
2166
2167
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002168def process_outdir_options(parser, options, cwd):
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002169 if not options.outdir:
2170 parser.error('--outdir is required.')
2171 if file_path.is_url(options.outdir):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002172 parser.error('Can\'t use an URL for --outdir.')
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002173 options.outdir = unicode(options.outdir).replace('/', os.path.sep)
2174 # outdir doesn't need native path case since tracing is never done from there.
2175 options.outdir = os.path.abspath(
2176 os.path.normpath(os.path.join(cwd, options.outdir)))
2177 # In theory, we'd create the directory outdir right away. Defer doing it in
2178 # case there's errors in the command line.
2179
2180
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002181class OptionParserIsolateServer(tools.OptionParserWithLogging):
2182 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002183 tools.OptionParserWithLogging.__init__(
2184 self,
2185 version=__version__,
2186 prog=os.path.basename(sys.modules[__name__].__file__),
2187 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002188 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002189
2190 def parse_args(self, *args, **kwargs):
2191 options, args = tools.OptionParserWithLogging.parse_args(
2192 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002193 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002194 return options, args
2195
2196
2197def main(args):
2198 dispatcher = subcommand.CommandDispatcher(__name__)
2199 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002200 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00002201 except Exception as e:
2202 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002203 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002204
2205
2206if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002207 fix_encoding.fix_encoding()
2208 tools.disable_buffering()
2209 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002210 sys.exit(main(sys.argv[1:]))