blob: f84324f1ddfaf1732f8249be1962aebe445e6bfe [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05006"""Archives a set of files or directories to a server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05008__version__ = '0.3.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import shutil
17import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050019import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
36
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000038# Version of isolate protocol passed to the server in /handshake request.
39ISOLATE_PROTOCOL_VERSION = '1.0'
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -050040# Version stored and expected in .isolated files.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -050041ISOLATED_FILE_VERSION = '1.3'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043
44# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000045# All files are sorted by likelihood of a change in the file content
46# (currently file size is used to estimate this: larger the file -> larger the
47# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000049# and so on. Numbers here is a trade-off; the more per request, the lower the
50# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
51# larger values cause longer lookups, increasing the initial latency to start
52# uploading, which is especially an issue for large files. This value is
53# optimized for the "few thousands files to look up with minimal number of large
54# files missing" case.
55ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000056
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000057
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000058# A list of already compressed extension types that should not receive any
59# compression before being uploaded.
60ALREADY_COMPRESSED_TYPES = [
61 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
62 'wav', 'zip'
63]
64
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000065
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000066# The file size to be used when we don't know the correct file size,
67# generally used for .isolated files.
68UNKNOWN_FILE_SIZE = None
69
70
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000071# Chunk size to use when doing disk I/O.
72DISK_FILE_CHUNK = 1024 * 1024
73
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000074# Chunk size to use when reading from network stream.
75NET_IO_FILE_CHUNK = 16 * 1024
76
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000077
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000078# Read timeout in seconds for downloads from isolate storage. If there's no
79# response from the server within this timeout whole download will be aborted.
80DOWNLOAD_READ_TIMEOUT = 60
81
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000082# Maximum expected delay (in seconds) between successive file fetches
83# in run_tha_test. If it takes longer than that, a deadlock might be happening
84# and all stack frames for all threads are dumped to log.
85DEADLOCK_TIMEOUT = 5 * 60
86
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000087
maruel@chromium.org41601642013-09-18 19:40:46 +000088# The delay (in seconds) to wait between logging statements when retrieving
89# the required files. This is intended to let the user (or buildbot) know that
90# the program is still running.
91DELAY_BETWEEN_UPDATES_IN_SECS = 30
92
93
maruel@chromium.org385d73d2013-09-19 18:33:21 +000094# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
95# specify the names here.
96SUPPORTED_ALGOS = {
97 'md5': hashlib.md5,
98 'sha-1': hashlib.sha1,
99 'sha-512': hashlib.sha512,
100}
101
102
103# Used for serialization.
104SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
105
106
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500107DEFAULT_BLACKLIST = (
108 # Temporary vim or python files.
109 r'^.+\.(?:pyc|swp)$',
110 # .git or .svn directory.
111 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
112)
113
114
115# Chromium-specific.
116DEFAULT_BLACKLIST += (
117 r'^.+\.(?:run_test_cases)$',
118 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
119)
120
121
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500122class Error(Exception):
123 """Generic runtime error."""
124 pass
125
126
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000127class ConfigError(ValueError):
128 """Generic failure to load a .isolated file."""
129 pass
130
131
132class MappingError(OSError):
133 """Failed to recreate the tree."""
134 pass
135
136
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000137def is_valid_hash(value, algo):
138 """Returns if the value is a valid hash for the corresponding algorithm."""
139 size = 2 * algo().digest_size
140 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
141
142
143def hash_file(filepath, algo):
144 """Calculates the hash of a file without reading it all in memory at once.
145
146 |algo| should be one of hashlib hashing algorithm.
147 """
148 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000149 with open(filepath, 'rb') as f:
150 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000152 if not chunk:
153 break
154 digest.update(chunk)
155 return digest.hexdigest()
156
157
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000158def stream_read(stream, chunk_size):
159 """Reads chunks from |stream| and yields them."""
160 while True:
161 data = stream.read(chunk_size)
162 if not data:
163 break
164 yield data
165
166
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800167def file_read(filepath, chunk_size=DISK_FILE_CHUNK, offset=0):
168 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000169 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800170 if offset:
171 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000172 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000173 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000174 if not data:
175 break
176 yield data
177
178
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000179def file_write(filepath, content_generator):
180 """Writes file content as generated by content_generator.
181
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000182 Creates the intermediary directory as needed.
183
184 Returns the number of bytes written.
185
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000186 Meant to be mocked out in unit tests.
187 """
188 filedir = os.path.dirname(filepath)
189 if not os.path.isdir(filedir):
190 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000191 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000192 with open(filepath, 'wb') as f:
193 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000194 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000195 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000196 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000197
198
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000199def zip_compress(content_generator, level=7):
200 """Reads chunks from |content_generator| and yields zip compressed chunks."""
201 compressor = zlib.compressobj(level)
202 for chunk in content_generator:
203 compressed = compressor.compress(chunk)
204 if compressed:
205 yield compressed
206 tail = compressor.flush(zlib.Z_FINISH)
207 if tail:
208 yield tail
209
210
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000211def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
212 """Reads zipped data from |content_generator| and yields decompressed data.
213
214 Decompresses data in small chunks (no larger than |chunk_size|) so that
215 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
216
217 Raises IOError if data is corrupted or incomplete.
218 """
219 decompressor = zlib.decompressobj()
220 compressed_size = 0
221 try:
222 for chunk in content_generator:
223 compressed_size += len(chunk)
224 data = decompressor.decompress(chunk, chunk_size)
225 if data:
226 yield data
227 while decompressor.unconsumed_tail:
228 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
229 if data:
230 yield data
231 tail = decompressor.flush()
232 if tail:
233 yield tail
234 except zlib.error as e:
235 raise IOError(
236 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
237 # Ensure all data was read and decompressed.
238 if decompressor.unused_data or decompressor.unconsumed_tail:
239 raise IOError('Not all data was decompressed')
240
241
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000242def get_zip_compression_level(filename):
243 """Given a filename calculates the ideal zip compression level to use."""
244 file_ext = os.path.splitext(filename)[1].lower()
245 # TODO(csharp): Profile to find what compression level works best.
246 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
247
248
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000249def create_directories(base_directory, files):
250 """Creates the directory structure needed by the given list of files."""
251 logging.debug('create_directories(%s, %d)', base_directory, len(files))
252 # Creates the tree of directories to create.
253 directories = set(os.path.dirname(f) for f in files)
254 for item in list(directories):
255 while item:
256 directories.add(item)
257 item = os.path.dirname(item)
258 for d in sorted(directories):
259 if d:
260 os.mkdir(os.path.join(base_directory, d))
261
262
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500263def create_symlinks(base_directory, files):
264 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000265 for filepath, properties in files:
266 if 'l' not in properties:
267 continue
268 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500269 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000270 logging.warning('Ignoring symlink %s', filepath)
271 continue
272 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500273 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000274 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000275
276
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000277def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000278 """Determines if the given files appears valid.
279
280 Currently it just checks the file's size.
281 """
282 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000283 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000284 actual_size = os.stat(filepath).st_size
285 if size != actual_size:
286 logging.warning(
287 'Found invalid item %s; %d != %d',
288 os.path.basename(filepath), actual_size, size)
289 return False
290 return True
291
292
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000293class WorkerPool(threading_utils.AutoRetryThreadPool):
294 """Thread pool that automatically retries on IOError and runs a preconfigured
295 function.
296 """
297 # Initial and maximum number of worker threads.
298 INITIAL_WORKERS = 2
299 MAX_WORKERS = 16
300 RETRIES = 5
301
302 def __init__(self):
303 super(WorkerPool, self).__init__(
304 [IOError],
305 self.RETRIES,
306 self.INITIAL_WORKERS,
307 self.MAX_WORKERS,
308 0,
309 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000310
311
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000312class Item(object):
313 """An item to push to Storage.
314
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800315 Its digest and size may be provided in advance, if known. Otherwise they will
316 be derived from content(). If digest is provided, it MUST correspond to
317 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 When used with Storage, Item starts its life in a main thread, travels
320 to 'contains' thread, then to 'push' thread and then finally back to
321 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000322 """
323
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000325 self.digest = digest
326 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800327 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000328 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000329
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800330 def content(self):
331 """Iterable with content of this item as byte string (str) chunks."""
332 raise NotImplementedError()
333
334 def prepare(self, hash_algo):
335 """Ensures self.digest and self.size are set.
336
337 Uses content() as a source of data to calculate them. Does nothing if digest
338 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000339
340 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800341 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000342 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800343 if self.digest is None or self.size is None:
344 digest = hash_algo()
345 total = 0
346 for chunk in self.content():
347 digest.update(chunk)
348 total += len(chunk)
349 self.digest = digest.hexdigest()
350 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000351
352
353class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800354 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000355
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800356 Its digest and size may be provided in advance, if known. Otherwise they will
357 be derived from the file content.
358 """
359
360 def __init__(self, path, digest=None, size=None, high_priority=False):
361 super(FileItem, self).__init__(
362 digest,
363 size if size is not None else os.stat(path).st_size,
364 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000365 self.path = path
366 self.compression_level = get_zip_compression_level(path)
367
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 def content(self):
369 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000370
371
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000372class BufferItem(Item):
373 """A byte buffer to push to Storage."""
374
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800375 def __init__(self, buf, high_priority=False):
376 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000377 self.buffer = buf
378
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800379 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000380 return [self.buffer]
381
382
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800384 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000385
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800386 Implements compression support, parallel 'contains' checks, parallel uploads
387 and more.
388
389 Works only within single namespace (and thus hashing algorithm and compression
390 scheme are fixed).
391
392 Spawns multiple internal threads. Thread safe, but not fork safe.
393 """
394
395 def __init__(self, storage_api, use_zip, hash_algo):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000396 self.use_zip = use_zip
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 self.hash_algo = hash_algo
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000398 self._storage_api = storage_api
399 self._cpu_thread_pool = None
400 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000401
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000402 @property
403 def cpu_thread_pool(self):
404 """ThreadPool for CPU-bound tasks like zipping."""
405 if self._cpu_thread_pool is None:
406 self._cpu_thread_pool = threading_utils.ThreadPool(
407 2, max(threading_utils.num_processors(), 2), 0, 'zip')
408 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000409
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000410 @property
411 def net_thread_pool(self):
412 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
413 if self._net_thread_pool is None:
414 self._net_thread_pool = WorkerPool()
415 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000416
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000417 def close(self):
418 """Waits for all pending tasks to finish."""
419 if self._cpu_thread_pool:
420 self._cpu_thread_pool.join()
421 self._cpu_thread_pool.close()
422 self._cpu_thread_pool = None
423 if self._net_thread_pool:
424 self._net_thread_pool.join()
425 self._net_thread_pool.close()
426 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000427
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000428 def __enter__(self):
429 """Context manager interface."""
430 return self
431
432 def __exit__(self, _exc_type, _exc_value, _traceback):
433 """Context manager interface."""
434 self.close()
435 return False
436
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000437 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800438 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000439
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800440 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000441
442 Arguments:
443 items: list of Item instances that represents data to upload.
444
445 Returns:
446 List of items that were uploaded. All other items are already there.
447 """
448 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
449 # used by swarming.py. There's no need to spawn multiple threads and try to
450 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
451 # 'push' should be performed sequentially in the context of current thread.
452
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800453 # Ensure all digests are calculated.
454 for item in items:
455 item.prepare(self.hash_algo)
456
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000457 # For each digest keep only first Item that matches it. All other items
458 # are just indistinguishable copies from the point of view of isolate
459 # server (it doesn't care about paths at all, only content and digests).
460 seen = {}
461 duplicates = 0
462 for item in items:
463 if seen.setdefault(item.digest, item) is not item:
464 duplicates += 1
465 items = seen.values()
466 if duplicates:
467 logging.info('Skipped %d duplicated files', duplicates)
468
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000469 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000470 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000471 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800472 channel = threading_utils.TaskChannel()
473 for missing_item, push_state in self.get_missing_items(items):
474 missing.add(missing_item)
475 self.async_push(channel, missing_item, push_state)
476
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000477 # No need to spawn deadlock detector thread if there's nothing to upload.
478 if missing:
479 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
480 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000481 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 detector.ping()
483 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000484 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000485 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000486 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 logging.info('All files are uploaded')
488
489 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000490 total = len(items)
491 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000492 logging.info(
493 'Total: %6d, %9.1fkb',
494 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000495 total_size / 1024.)
496 cache_hit = set(items) - missing
497 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000498 logging.info(
499 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
500 len(cache_hit),
501 cache_hit_size / 1024.,
502 len(cache_hit) * 100. / total,
503 cache_hit_size * 100. / total_size if total_size else 0)
504 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000505 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000506 logging.info(
507 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
508 len(cache_miss),
509 cache_miss_size / 1024.,
510 len(cache_miss) * 100. / total,
511 cache_miss_size * 100. / total_size if total_size else 0)
512
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000513 return uploaded
514
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 def get_fetch_url(self, item):
516 """Returns an URL that can be used to fetch given item once it's uploaded.
517
518 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000519
520 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800521 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000522
523 Returns:
524 An URL or None if underlying protocol doesn't support this.
525 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800526 item.prepare(self.hash_algo)
527 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000530 """Starts asynchronous push to the server in a parallel thread.
531
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800532 Can be used only after |item| was checked for presence on a server with
533 'get_missing_items' call. 'get_missing_items' returns |push_state| object
534 that contains storage specific information describing how to upload
535 the item (for example in case of cloud storage, it is signed upload URLs).
536
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000537 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000538 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000539 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800540 push_state: push state returned by 'get_missing_items' call for |item|.
541
542 Returns:
543 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000544 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800545 # Thread pool task priority.
546 priority = WorkerPool.HIGH if item.high_priority else WorkerPool.MED
547
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000548 def push(content):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800549 """Pushes an item and returns it to |channel|."""
550 item.prepare(self.hash_algo)
551 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 return item
553
554 # If zipping is not required, just start a push task.
555 if not self.use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800556 self.net_thread_pool.add_task_with_channel(
557 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000558 return
559
560 # If zipping is enabled, zip in a separate thread.
561 def zip_and_push():
562 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
563 # content right here. It will block until all file is zipped.
564 try:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800565 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000566 data = ''.join(stream)
567 except Exception as exc:
568 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800569 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000570 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000571 self.net_thread_pool.add_task_with_channel(
572 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 def push(self, item, push_state):
576 """Synchronously pushes a single item to the server.
577
578 If you need to push many items at once, consider using 'upload_items' or
579 'async_push' with instance of TaskChannel.
580
581 Arguments:
582 item: item to upload as instance of Item class.
583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 Pushed item (same object as |item|).
587 """
588 channel = threading_utils.TaskChannel()
589 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
590 self.async_push(channel, item, push_state)
591 pushed = channel.pull()
592 assert pushed is item
593 return item
594
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000595 def async_fetch(self, channel, priority, digest, size, sink):
596 """Starts asynchronous fetch from the server in a parallel thread.
597
598 Arguments:
599 channel: TaskChannel that receives back |digest| when download ends.
600 priority: thread pool task priority for the fetch.
601 digest: hex digest of an item to download.
602 size: expected size of the item (after decompression).
603 sink: function that will be called as sink(generator).
604 """
605 def fetch():
606 try:
607 # Prepare reading pipeline.
608 stream = self._storage_api.fetch(digest)
609 if self.use_zip:
610 stream = zip_decompress(stream, DISK_FILE_CHUNK)
611 # Run |stream| through verifier that will assert its size.
612 verifier = FetchStreamVerifier(stream, size)
613 # Verified stream goes to |sink|.
614 sink(verifier.run())
615 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800616 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000617 raise
618 return digest
619
620 # Don't bother with zip_thread_pool for decompression. Decompression is
621 # really fast and most probably IO bound anyway.
622 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
623
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000624 def get_missing_items(self, items):
625 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000626
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000627 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628
629 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000630 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000631
632 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800633 For each missing item it yields a pair (item, push_state), where:
634 * item - Item object that is missing (one of |items|).
635 * push_state - opaque object that contains storage specific information
636 describing how to upload the item (for example in case of cloud
637 storage, it is signed upload URLs). It can later be passed to
638 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000639 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000640 channel = threading_utils.TaskChannel()
641 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642
643 # Ensure all digests are calculated.
644 for item in items:
645 item.prepare(self.hash_algo)
646
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800648 for batch in batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000649 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
650 self._storage_api.contains, batch)
651 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800652
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000653 # Yield results as they come in.
654 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800655 for missing_item, push_state in channel.pull().iteritems():
656 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000657
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000658
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800659def batch_items_for_check(items):
660 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000661
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800662 Each batch corresponds to a single 'exists?' query to the server via a call
663 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000664
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800665 Arguments:
666 items: a list of Item objects.
667
668 Yields:
669 Batches of items to query for existence in a single operation,
670 each batch is a list of Item objects.
671 """
672 batch_count = 0
673 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
674 next_queries = []
675 for item in sorted(items, key=lambda x: x.size, reverse=True):
676 next_queries.append(item)
677 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000678 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800679 next_queries = []
680 batch_count += 1
681 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
682 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
683 if next_queries:
684 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000685
686
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000687class FetchQueue(object):
688 """Fetches items from Storage and places them into LocalCache.
689
690 It manages multiple concurrent fetch operations. Acts as a bridge between
691 Storage and LocalCache so that Storage and LocalCache don't depend on each
692 other at all.
693 """
694
695 def __init__(self, storage, cache):
696 self.storage = storage
697 self.cache = cache
698 self._channel = threading_utils.TaskChannel()
699 self._pending = set()
700 self._accessed = set()
701 self._fetched = cache.cached_set()
702
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800703 def add(self, digest, size=UNKNOWN_FILE_SIZE, priority=WorkerPool.MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000704 """Starts asynchronous fetch of item |digest|."""
705 # Fetching it now?
706 if digest in self._pending:
707 return
708
709 # Mark this file as in use, verify_all_cached will later ensure it is still
710 # in cache.
711 self._accessed.add(digest)
712
713 # Already fetched? Notify cache to update item's LRU position.
714 if digest in self._fetched:
715 # 'touch' returns True if item is in cache and not corrupted.
716 if self.cache.touch(digest, size):
717 return
718 # Item is corrupted, remove it from cache and fetch it again.
719 self._fetched.remove(digest)
720 self.cache.evict(digest)
721
722 # TODO(maruel): It should look at the free disk space, the current cache
723 # size and the size of the new item on every new item:
724 # - Trim the cache as more entries are listed when free disk space is low,
725 # otherwise if the amount of data downloaded during the run > free disk
726 # space, it'll crash.
727 # - Make sure there's enough free disk space to fit all dependencies of
728 # this run! If not, abort early.
729
730 # Start fetching.
731 self._pending.add(digest)
732 self.storage.async_fetch(
733 self._channel, priority, digest, size,
734 functools.partial(self.cache.write, digest))
735
736 def wait(self, digests):
737 """Starts a loop that waits for at least one of |digests| to be retrieved.
738
739 Returns the first digest retrieved.
740 """
741 # Flush any already fetched items.
742 for digest in digests:
743 if digest in self._fetched:
744 return digest
745
746 # Ensure all requested items are being fetched now.
747 assert all(digest in self._pending for digest in digests), (
748 digests, self._pending)
749
750 # Wait for some requested item to finish fetching.
751 while self._pending:
752 digest = self._channel.pull()
753 self._pending.remove(digest)
754 self._fetched.add(digest)
755 if digest in digests:
756 return digest
757
758 # Should never reach this point due to assert above.
759 raise RuntimeError('Impossible state')
760
761 def inject_local_file(self, path, algo):
762 """Adds local file to the cache as if it was fetched from storage."""
763 with open(path, 'rb') as f:
764 data = f.read()
765 digest = algo(data).hexdigest()
766 self.cache.write(digest, [data])
767 self._fetched.add(digest)
768 return digest
769
770 @property
771 def pending_count(self):
772 """Returns number of items to be fetched."""
773 return len(self._pending)
774
775 def verify_all_cached(self):
776 """True if all accessed items are in cache."""
777 return self._accessed.issubset(self.cache.cached_set())
778
779
780class FetchStreamVerifier(object):
781 """Verifies that fetched file is valid before passing it to the LocalCache."""
782
783 def __init__(self, stream, expected_size):
784 self.stream = stream
785 self.expected_size = expected_size
786 self.current_size = 0
787
788 def run(self):
789 """Generator that yields same items as |stream|.
790
791 Verifies |stream| is complete before yielding a last chunk to consumer.
792
793 Also wraps IOError produced by consumer into MappingError exceptions since
794 otherwise Storage will retry fetch on unrelated local cache errors.
795 """
796 # Read one chunk ahead, keep it in |stored|.
797 # That way a complete stream can be verified before pushing last chunk
798 # to consumer.
799 stored = None
800 for chunk in self.stream:
801 assert chunk is not None
802 if stored is not None:
803 self._inspect_chunk(stored, is_last=False)
804 try:
805 yield stored
806 except IOError as exc:
807 raise MappingError('Failed to store an item in cache: %s' % exc)
808 stored = chunk
809 if stored is not None:
810 self._inspect_chunk(stored, is_last=True)
811 try:
812 yield stored
813 except IOError as exc:
814 raise MappingError('Failed to store an item in cache: %s' % exc)
815
816 def _inspect_chunk(self, chunk, is_last):
817 """Called for each fetched chunk before passing it to consumer."""
818 self.current_size += len(chunk)
819 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
820 (self.expected_size != self.current_size)):
821 raise IOError('Incorrect file size: expected %d, got %d' % (
822 self.expected_size, self.current_size))
823
824
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000825class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800826 """Interface for classes that implement low-level storage operations.
827
828 StorageApi is oblivious of compression and hashing scheme used. This details
829 are handled in higher level Storage class.
830
831 Clients should generally not use StorageApi directly. Storage class is
832 preferred since it implements compression and upload optimizations.
833 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000834
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000835 def get_fetch_url(self, digest):
836 """Returns an URL that can be used to fetch an item with given digest.
837
838 Arguments:
839 digest: hex digest of item to fetch.
840
841 Returns:
842 An URL or None if the protocol doesn't support this.
843 """
844 raise NotImplementedError()
845
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800846 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000847 """Fetches an object and yields its content.
848
849 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000850 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800851 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000852
853 Yields:
854 Chunks of downloaded item (as str objects).
855 """
856 raise NotImplementedError()
857
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800858 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000859 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000860
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800861 |item| MUST go through 'contains' call to get |push_state| before it can
862 be pushed to the storage.
863
864 To be clear, here is one possible usage:
865 all_items = [... all items to push as Item subclasses ...]
866 for missing_item, push_state in storage_api.contains(all_items).items():
867 storage_api.push(missing_item, push_state)
868
869 When pushing to a namespace with compression, data that should be pushed
870 and data provided by the item is not the same. In that case |content| is
871 not None and it yields chunks of compressed data (using item.content() as
872 a source of original uncompressed data). This is implemented by Storage
873 class.
874
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000875 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000876 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800877 push_state: push state object as returned by 'contains' call.
878 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000879
880 Returns:
881 None.
882 """
883 raise NotImplementedError()
884
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000885 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800886 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000887
888 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800889 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000890
891 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800892 A dict missing Item -> opaque push state object to be passed to 'push'.
893 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000894 """
895 raise NotImplementedError()
896
897
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800898class _IsolateServerPushState(object):
899 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500900
901 Note this needs to be a global class to support pickling.
902 """
903
904 def __init__(self, upload_url, finalize_url):
905 self.upload_url = upload_url
906 self.finalize_url = finalize_url
907 self.uploaded = False
908 self.finalized = False
909
910
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000911class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000912 """StorageApi implementation that downloads and uploads to Isolate Server.
913
914 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800915 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000916 """
917
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000918 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000919 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000920 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000921 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000922 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000923 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000924 self._server_caps = None
925
926 @staticmethod
927 def _generate_handshake_request():
928 """Returns a dict to be sent as handshake request body."""
929 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
930 return {
931 'client_app_version': __version__,
932 'fetcher': True,
933 'protocol_version': ISOLATE_PROTOCOL_VERSION,
934 'pusher': True,
935 }
936
937 @staticmethod
938 def _validate_handshake_response(caps):
939 """Validates and normalizes handshake response."""
940 logging.info('Protocol version: %s', caps['protocol_version'])
941 logging.info('Server version: %s', caps['server_app_version'])
942 if caps.get('error'):
943 raise MappingError(caps['error'])
944 if not caps['access_token']:
945 raise ValueError('access_token is missing')
946 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000947
948 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000949 def _server_capabilities(self):
950 """Performs handshake with the server if not yet done.
951
952 Returns:
953 Server capabilities dictionary as returned by /handshake endpoint.
954
955 Raises:
956 MappingError if server rejects the handshake.
957 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000958 # TODO(maruel): Make this request much earlier asynchronously while the
959 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800960
961 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
962 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000963 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000964 if self._server_caps is None:
965 request_body = json.dumps(
966 self._generate_handshake_request(), separators=(',', ':'))
967 response = net.url_read(
968 url=self.base_url + '/content-gs/handshake',
969 data=request_body,
970 content_type='application/json',
971 method='POST')
972 if response is None:
973 raise MappingError('Failed to perform handshake.')
974 try:
975 caps = json.loads(response)
976 if not isinstance(caps, dict):
977 raise ValueError('Expecting JSON dict')
978 self._server_caps = self._validate_handshake_response(caps)
979 except (ValueError, KeyError, TypeError) as exc:
980 # KeyError exception has very confusing str conversion: it's just a
981 # missing key value and nothing else. So print exception class name
982 # as well.
983 raise MappingError('Invalid handshake response (%s): %s' % (
984 exc.__class__.__name__, exc))
985 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000986
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000987 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000988 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000989 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000990 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000991
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800992 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000993 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800994 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000995
996 # Because the app engine DB is only eventually consistent, retry 404 errors
997 # because the file might just not be visible yet (even though it has been
998 # uploaded).
999 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001000 source_url,
1001 retry_404=True,
1002 read_timeout=DOWNLOAD_READ_TIMEOUT,
1003 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1004
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001005 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001006 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001007
1008 # If |offset| is used, verify server respects it by checking Content-Range.
1009 if offset:
1010 content_range = connection.get_header('Content-Range')
1011 if not content_range:
1012 raise IOError('Missing Content-Range header')
1013
1014 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1015 # According to a spec, <size> can be '*' meaning "Total size of the file
1016 # is not known in advance".
1017 try:
1018 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1019 if not match:
1020 raise ValueError()
1021 content_offset = int(match.group(1))
1022 last_byte_index = int(match.group(2))
1023 size = None if match.group(3) == '*' else int(match.group(3))
1024 except ValueError:
1025 raise IOError('Invalid Content-Range header: %s' % content_range)
1026
1027 # Ensure returned offset equals requested one.
1028 if offset != content_offset:
1029 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1030 offset, content_offset, content_range))
1031
1032 # Ensure entire tail of the file is returned.
1033 if size is not None and last_byte_index + 1 != size:
1034 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1035
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001036 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001037
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001038 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001039 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001040 assert item.digest is not None
1041 assert item.size is not None
1042 assert isinstance(push_state, _IsolateServerPushState)
1043 assert not push_state.finalized
1044
1045 # Default to item.content().
1046 content = item.content() if content is None else content
1047
1048 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1049 if isinstance(content, basestring):
1050 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1051 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001052
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001053 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1054 # If |content| is indeed a generator, it can not be re-winded back
1055 # to the beginning of the stream. A retry will find it exhausted. A possible
1056 # solution is to wrap |content| generator with some sort of caching
1057 # restartable generator. It should be done alongside streaming support
1058 # implementation.
1059
1060 # This push operation may be a retry after failed finalization call below,
1061 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001062 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001063 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1064 # upload support is implemented.
1065 if isinstance(content, list) and len(content) == 1:
1066 content = content[0]
1067 else:
1068 content = ''.join(content)
1069 # PUT file to |upload_url|.
1070 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001071 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001072 data=content,
1073 content_type='application/octet-stream',
1074 method='PUT')
1075 if response is None:
1076 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001077 item.digest, push_state.upload_url))
1078 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001079 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001080 logging.info(
1081 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001082
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001083 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001084 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001085 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1086 # send it to isolated server. That way isolate server can verify that
1087 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1088 # stored files).
1089 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001090 url=push_state.finalize_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001091 data='',
1092 content_type='application/json',
1093 method='POST')
1094 if response is None:
1095 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001096 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001097
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001098 def contains(self, items):
1099 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001100
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001101 # Ensure all items were initialized with 'prepare' call. Storage does that.
1102 assert all(i.digest is not None and i.size is not None for i in items)
1103
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001104 # Request body is a json encoded list of dicts.
1105 body = [
1106 {
1107 'h': item.digest,
1108 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001109 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001110 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001111 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001112
1113 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
1114 self.base_url,
1115 self.namespace,
1116 urllib.quote(self._server_capabilities['access_token']))
1117 response_body = net.url_read(
1118 url=query_url,
1119 data=json.dumps(body, separators=(',', ':')),
1120 content_type='application/json',
1121 method='POST')
1122 if response_body is None:
1123 raise MappingError('Failed to execute /pre-upload query')
1124
1125 # Response body is a list of push_urls (or null if file is already present).
1126 try:
1127 response = json.loads(response_body)
1128 if not isinstance(response, list):
1129 raise ValueError('Expecting response with json-encoded list')
1130 if len(response) != len(items):
1131 raise ValueError(
1132 'Incorrect number of items in the list, expected %d, '
1133 'but got %d' % (len(items), len(response)))
1134 except ValueError as err:
1135 raise MappingError(
1136 'Invalid response from server: %s, body is %s' % (err, response_body))
1137
1138 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001139 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001140 for i, push_urls in enumerate(response):
1141 if push_urls:
1142 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001143 missing_items[items[i]] = _IsolateServerPushState(
1144 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001145 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001146 len(items), len(items) - len(missing_items))
1147 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001148
1149
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001150class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001151 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001152
1153 The common use case is a NFS/CIFS file server that is mounted locally that is
1154 used to fetch the file on a local partition.
1155 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001156
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001157 # Used for push_state instead of None. That way caller is forced to
1158 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1159 _DUMMY_PUSH_STATE = object()
1160
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001161 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001162 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001163 self.base_path = base_path
1164
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001165 def get_fetch_url(self, digest):
1166 return None
1167
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001168 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001169 assert isinstance(digest, basestring)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001170 return file_read(os.path.join(self.base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001171
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001172 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001173 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001174 assert item.digest is not None
1175 assert item.size is not None
1176 assert push_state is self._DUMMY_PUSH_STATE
1177 content = item.content() if content is None else content
1178 if isinstance(content, basestring):
1179 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1180 content = [content]
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001181 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001182
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001183 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001184 assert all(i.digest is not None and i.size is not None for i in items)
1185 return dict(
1186 (item, self._DUMMY_PUSH_STATE) for item in items
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001187 if not os.path.exists(os.path.join(self.base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001188 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001189
1190
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001191class LocalCache(object):
1192 """Local cache that stores objects fetched via Storage.
1193
1194 It can be accessed concurrently from multiple threads, so it should protect
1195 its internal state with some lock.
1196 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001197 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001198
1199 def __enter__(self):
1200 """Context manager interface."""
1201 return self
1202
1203 def __exit__(self, _exc_type, _exec_value, _traceback):
1204 """Context manager interface."""
1205 return False
1206
1207 def cached_set(self):
1208 """Returns a set of all cached digests (always a new object)."""
1209 raise NotImplementedError()
1210
1211 def touch(self, digest, size):
1212 """Ensures item is not corrupted and updates its LRU position.
1213
1214 Arguments:
1215 digest: hash digest of item to check.
1216 size: expected size of this item.
1217
1218 Returns:
1219 True if item is in cache and not corrupted.
1220 """
1221 raise NotImplementedError()
1222
1223 def evict(self, digest):
1224 """Removes item from cache if it's there."""
1225 raise NotImplementedError()
1226
1227 def read(self, digest):
1228 """Returns contents of the cached item as a single str."""
1229 raise NotImplementedError()
1230
1231 def write(self, digest, content):
1232 """Reads data from |content| generator and stores it in cache."""
1233 raise NotImplementedError()
1234
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001235 def hardlink(self, digest, dest, file_mode):
1236 """Ensures file at |dest| has same content as cached |digest|.
1237
1238 If file_mode is provided, it is used to set the executable bit if
1239 applicable.
1240 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001241 raise NotImplementedError()
1242
1243
1244class MemoryCache(LocalCache):
1245 """LocalCache implementation that stores everything in memory."""
1246
1247 def __init__(self):
1248 super(MemoryCache, self).__init__()
1249 # Let's not assume dict is thread safe.
1250 self._lock = threading.Lock()
1251 self._contents = {}
1252
1253 def cached_set(self):
1254 with self._lock:
1255 return set(self._contents)
1256
1257 def touch(self, digest, size):
1258 with self._lock:
1259 return digest in self._contents
1260
1261 def evict(self, digest):
1262 with self._lock:
1263 self._contents.pop(digest, None)
1264
1265 def read(self, digest):
1266 with self._lock:
1267 return self._contents[digest]
1268
1269 def write(self, digest, content):
1270 # Assemble whole stream before taking the lock.
1271 data = ''.join(content)
1272 with self._lock:
1273 self._contents[digest] = data
1274
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001275 def hardlink(self, digest, dest, file_mode):
1276 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001277 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001278 if file_mode is not None:
1279 # Ignores all other bits.
1280 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001281
1282
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001283def get_hash_algo(_namespace):
1284 """Return hash algorithm class to use when uploading to given |namespace|."""
1285 # TODO(vadimsh): Implement this at some point.
1286 return hashlib.sha1
1287
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001288
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001289def is_namespace_with_compression(namespace):
1290 """Returns True if given |namespace| stores compressed objects."""
1291 return namespace.endswith(('-gzip', '-deflate'))
1292
1293
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001294def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001295 """Returns an object that implements low-level StorageApi interface.
1296
1297 It is used by Storage to work with single isolate |namespace|. It should
1298 rarely be used directly by clients, see 'get_storage' for
1299 a better alternative.
1300
1301 Arguments:
1302 file_or_url: a file path to use file system based storage, or URL of isolate
1303 service to use shared cloud based storage.
1304 namespace: isolate namespace to operate in, also defines hashing and
1305 compression scheme used, i.e. namespace names that end with '-gzip'
1306 store compressed data.
1307
1308 Returns:
1309 Instance of StorageApi subclass.
1310 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001311 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001312 return IsolateServer(file_or_url, namespace)
1313 else:
1314 return FileSystem(file_or_url)
1315
1316
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001317def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001318 """Returns Storage class that can upload and download from |namespace|.
1319
1320 Arguments:
1321 file_or_url: a file path to use file system based storage, or URL of isolate
1322 service to use shared cloud based storage.
1323 namespace: isolate namespace to operate in, also defines hashing and
1324 compression scheme used, i.e. namespace names that end with '-gzip'
1325 store compressed data.
1326
1327 Returns:
1328 Instance of Storage.
1329 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001330 return Storage(
1331 get_storage_api(file_or_url, namespace),
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001332 is_namespace_with_compression(namespace),
1333 get_hash_algo(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001334
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001335
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001336def expand_symlinks(indir, relfile):
1337 """Follows symlinks in |relfile|, but treating symlinks that point outside the
1338 build tree as if they were ordinary directories/files. Returns the final
1339 symlink-free target and a list of paths to symlinks encountered in the
1340 process.
1341
1342 The rule about symlinks outside the build tree is for the benefit of the
1343 Chromium OS ebuild, which symlinks the output directory to an unrelated path
1344 in the chroot.
1345
1346 Fails when a directory loop is detected, although in theory we could support
1347 that case.
1348 """
1349 is_directory = relfile.endswith(os.path.sep)
1350 done = indir
1351 todo = relfile.strip(os.path.sep)
1352 symlinks = []
1353
1354 while todo:
1355 pre_symlink, symlink, post_symlink = file_path.split_at_symlink(
1356 done, todo)
1357 if not symlink:
1358 todo = file_path.fix_native_path_case(done, todo)
1359 done = os.path.join(done, todo)
1360 break
1361 symlink_path = os.path.join(done, pre_symlink, symlink)
1362 post_symlink = post_symlink.lstrip(os.path.sep)
1363 # readlink doesn't exist on Windows.
1364 # pylint: disable=E1101
1365 target = os.path.normpath(os.path.join(done, pre_symlink))
1366 symlink_target = os.readlink(symlink_path)
1367 if os.path.isabs(symlink_target):
1368 # Absolute path are considered a normal directories. The use case is
1369 # generally someone who puts the output directory on a separate drive.
1370 target = symlink_target
1371 else:
1372 # The symlink itself could be using the wrong path case.
1373 target = file_path.fix_native_path_case(target, symlink_target)
1374
1375 if not os.path.exists(target):
1376 raise MappingError(
1377 'Symlink target doesn\'t exist: %s -> %s' % (symlink_path, target))
1378 target = file_path.get_native_path_case(target)
1379 if not file_path.path_starts_with(indir, target):
1380 done = symlink_path
1381 todo = post_symlink
1382 continue
1383 if file_path.path_starts_with(target, symlink_path):
1384 raise MappingError(
1385 'Can\'t map recursive symlink reference %s -> %s' %
1386 (symlink_path, target))
1387 logging.info('Found symlink: %s -> %s', symlink_path, target)
1388 symlinks.append(os.path.relpath(symlink_path, indir))
1389 # Treat the common prefix of the old and new paths as done, and start
1390 # scanning again.
1391 target = target.split(os.path.sep)
1392 symlink_path = symlink_path.split(os.path.sep)
1393 prefix_length = 0
1394 for target_piece, symlink_path_piece in zip(target, symlink_path):
1395 if target_piece == symlink_path_piece:
1396 prefix_length += 1
1397 else:
1398 break
1399 done = os.path.sep.join(target[:prefix_length])
1400 todo = os.path.join(
1401 os.path.sep.join(target[prefix_length:]), post_symlink)
1402
1403 relfile = os.path.relpath(done, indir)
1404 relfile = relfile.rstrip(os.path.sep) + is_directory * os.path.sep
1405 return relfile, symlinks
1406
1407
1408def expand_directory_and_symlink(indir, relfile, blacklist, follow_symlinks):
1409 """Expands a single input. It can result in multiple outputs.
1410
1411 This function is recursive when relfile is a directory.
1412
1413 Note: this code doesn't properly handle recursive symlink like one created
1414 with:
1415 ln -s .. foo
1416 """
1417 if os.path.isabs(relfile):
1418 raise MappingError('Can\'t map absolute path %s' % relfile)
1419
1420 infile = file_path.normpath(os.path.join(indir, relfile))
1421 if not infile.startswith(indir):
1422 raise MappingError('Can\'t map file %s outside %s' % (infile, indir))
1423
1424 filepath = os.path.join(indir, relfile)
1425 native_filepath = file_path.get_native_path_case(filepath)
1426 if filepath != native_filepath:
1427 # Special case './'.
1428 if filepath != native_filepath + '.' + os.path.sep:
1429 # Give up enforcing strict path case on OSX. Really, it's that sad. The
1430 # case where it happens is very specific and hard to reproduce:
1431 # get_native_path_case(
1432 # u'Foo.framework/Versions/A/Resources/Something.nib') will return
1433 # u'Foo.framework/Versions/A/resources/Something.nib', e.g. lowercase 'r'.
1434 #
1435 # Note that this is really something deep in OSX because running
1436 # ls Foo.framework/Versions/A
1437 # will print out 'Resources', while file_path.get_native_path_case()
1438 # returns a lower case 'r'.
1439 #
1440 # So *something* is happening under the hood resulting in the command 'ls'
1441 # and Carbon.File.FSPathMakeRef('path').FSRefMakePath() to disagree. We
1442 # have no idea why.
1443 if sys.platform != 'darwin':
1444 raise MappingError(
1445 'File path doesn\'t equal native file path\n%s != %s' %
1446 (filepath, native_filepath))
1447
1448 symlinks = []
1449 if follow_symlinks:
1450 relfile, symlinks = expand_symlinks(indir, relfile)
1451
1452 if relfile.endswith(os.path.sep):
1453 if not os.path.isdir(infile):
1454 raise MappingError(
1455 '%s is not a directory but ends with "%s"' % (infile, os.path.sep))
1456
1457 # Special case './'.
1458 if relfile.startswith('.' + os.path.sep):
1459 relfile = relfile[2:]
1460 outfiles = symlinks
1461 try:
1462 for filename in os.listdir(infile):
1463 inner_relfile = os.path.join(relfile, filename)
1464 if blacklist and blacklist(inner_relfile):
1465 continue
1466 if os.path.isdir(os.path.join(indir, inner_relfile)):
1467 inner_relfile += os.path.sep
1468 outfiles.extend(
1469 expand_directory_and_symlink(indir, inner_relfile, blacklist,
1470 follow_symlinks))
1471 return outfiles
1472 except OSError as e:
1473 raise MappingError(
1474 'Unable to iterate over directory %s.\n%s' % (infile, e))
1475 else:
1476 # Always add individual files even if they were blacklisted.
1477 if os.path.isdir(infile):
1478 raise MappingError(
1479 'Input directory %s must have a trailing slash' % infile)
1480
1481 if not os.path.isfile(infile):
1482 raise MappingError('Input file %s doesn\'t exist' % infile)
1483
1484 return symlinks + [relfile]
1485
1486
1487def process_input(filepath, prevdict, read_only, flavor, algo):
1488 """Processes an input file, a dependency, and return meta data about it.
1489
1490 Behaviors:
1491 - Retrieves the file mode, file size, file timestamp, file link
1492 destination if it is a file link and calcultate the SHA-1 of the file's
1493 content if the path points to a file and not a symlink.
1494
1495 Arguments:
1496 filepath: File to act on.
1497 prevdict: the previous dictionary. It is used to retrieve the cached sha-1
1498 to skip recalculating the hash. Optional.
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001499 read_only: If 1 or 2, the file mode is manipulated. In practice, only save
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001500 one of 4 modes: 0755 (rwx), 0644 (rw), 0555 (rx), 0444 (r). On
1501 windows, mode is not set since all files are 'executable' by
1502 default.
1503 flavor: One isolated flavor, like 'linux', 'mac' or 'win'.
1504 algo: Hashing algorithm used.
1505
1506 Returns:
1507 The necessary data to create a entry in the 'files' section of an .isolated
1508 file.
1509 """
1510 out = {}
1511 # TODO(csharp): Fix crbug.com/150823 and enable the touched logic again.
1512 # if prevdict.get('T') == True:
1513 # # The file's content is ignored. Skip the time and hard code mode.
1514 # if get_flavor() != 'win':
1515 # out['m'] = stat.S_IRUSR | stat.S_IRGRP
1516 # out['s'] = 0
1517 # out['h'] = algo().hexdigest()
1518 # out['T'] = True
1519 # return out
1520
1521 # Always check the file stat and check if it is a link. The timestamp is used
1522 # to know if the file's content/symlink destination should be looked into.
1523 # E.g. only reuse from prevdict if the timestamp hasn't changed.
1524 # There is the risk of the file's timestamp being reset to its last value
1525 # manually while its content changed. We don't protect against that use case.
1526 try:
1527 filestats = os.lstat(filepath)
1528 except OSError:
1529 # The file is not present.
1530 raise MappingError('%s is missing' % filepath)
1531 is_link = stat.S_ISLNK(filestats.st_mode)
1532
1533 if flavor != 'win':
1534 # Ignore file mode on Windows since it's not really useful there.
1535 filemode = stat.S_IMODE(filestats.st_mode)
1536 # Remove write access for group and all access to 'others'.
1537 filemode &= ~(stat.S_IWGRP | stat.S_IRWXO)
1538 if read_only:
1539 filemode &= ~stat.S_IWUSR
1540 if filemode & stat.S_IXUSR:
1541 filemode |= stat.S_IXGRP
1542 else:
1543 filemode &= ~stat.S_IXGRP
1544 if not is_link:
1545 out['m'] = filemode
1546
1547 # Used to skip recalculating the hash or link destination. Use the most recent
1548 # update time.
1549 # TODO(maruel): Save it in the .state file instead of .isolated so the
1550 # .isolated file is deterministic.
1551 out['t'] = int(round(filestats.st_mtime))
1552
1553 if not is_link:
1554 out['s'] = filestats.st_size
1555 # If the timestamp wasn't updated and the file size is still the same, carry
1556 # on the sha-1.
1557 if (prevdict.get('t') == out['t'] and
1558 prevdict.get('s') == out['s']):
1559 # Reuse the previous hash if available.
1560 out['h'] = prevdict.get('h')
1561 if not out.get('h'):
1562 out['h'] = hash_file(filepath, algo)
1563 else:
1564 # If the timestamp wasn't updated, carry on the link destination.
1565 if prevdict.get('t') == out['t']:
1566 # Reuse the previous link destination if available.
1567 out['l'] = prevdict.get('l')
1568 if out.get('l') is None:
1569 # The link could be in an incorrect path case. In practice, this only
1570 # happen on OSX on case insensitive HFS.
1571 # TODO(maruel): It'd be better if it was only done once, in
1572 # expand_directory_and_symlink(), so it would not be necessary to do again
1573 # here.
1574 symlink_value = os.readlink(filepath) # pylint: disable=E1101
1575 filedir = file_path.get_native_path_case(os.path.dirname(filepath))
1576 native_dest = file_path.fix_native_path_case(filedir, symlink_value)
1577 out['l'] = os.path.relpath(native_dest, filedir)
1578 return out
1579
1580
1581def save_isolated(isolated, data):
1582 """Writes one or multiple .isolated files.
1583
1584 Note: this reference implementation does not create child .isolated file so it
1585 always returns an empty list.
1586
1587 Returns the list of child isolated files that are included by |isolated|.
1588 """
1589 # Make sure the data is valid .isolated data by 'reloading' it.
1590 algo = SUPPORTED_ALGOS[data['algo']]
1591 load_isolated(json.dumps(data), data.get('flavor'), algo)
1592 tools.write_json(isolated, data, True)
1593 return []
1594
1595
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001596def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001597 """Uploads the given tree to the given url.
1598
1599 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001600 base_url: The base url, it is assume that |base_url|/has/ can be used to
1601 query if an element was already uploaded, and |base_url|/store/
1602 can be used to upload a new element.
1603 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001604 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001605 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001606 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001607 logging.info('upload_tree(indir=%s, files=%d)', indir, len(infiles))
1608
1609 # Convert |indir| + |infiles| into a list of FileItem objects.
1610 # Filter out symlinks, since they are not represented by items on isolate
1611 # server side.
1612 items = [
1613 FileItem(
1614 path=os.path.join(indir, filepath),
1615 digest=metadata['h'],
1616 size=metadata['s'],
1617 high_priority=metadata.get('priority') == '0')
1618 for filepath, metadata in infiles.iteritems()
1619 if 'l' not in metadata
1620 ]
1621
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001622 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001623 storage.upload_items(items)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001624 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001625
1626
maruel@chromium.org41601642013-09-18 19:40:46 +00001627def load_isolated(content, os_flavor, algo):
1628 """Verifies the .isolated file is valid and loads this object with the json
1629 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001630
1631 Arguments:
1632 - content: raw serialized content to load.
1633 - os_flavor: OS to load this file on. Optional.
1634 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1635 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001636 """
1637 try:
1638 data = json.loads(content)
1639 except ValueError:
1640 raise ConfigError('Failed to parse: %s...' % content[:100])
1641
1642 if not isinstance(data, dict):
1643 raise ConfigError('Expected dict, got %r' % data)
1644
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001645 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001646 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001647 value = data.get('version', ISOLATED_FILE_VERSION)
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001648 if not isinstance(value, basestring):
1649 raise ConfigError('Expected string, got %r' % value)
1650 if not re.match(r'^(\d+)\.(\d+)$', value):
1651 raise ConfigError('Expected a compatible version, got %r' % value)
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05001652 if value.split('.', 1)[0] != ISOLATED_FILE_VERSION.split('.', 1)[0]:
1653 raise ConfigError(
1654 'Expected compatible \'%s\' version, got %r' %
1655 (ISOLATED_FILE_VERSION, value))
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001656
1657 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001658 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001659 # Default the algorithm used in the .isolated file itself, falls back to
1660 # 'sha-1' if unspecified.
1661 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1662
maruel@chromium.org41601642013-09-18 19:40:46 +00001663 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001664 if key == 'algo':
1665 if not isinstance(value, basestring):
1666 raise ConfigError('Expected string, got %r' % value)
1667 if value not in SUPPORTED_ALGOS:
1668 raise ConfigError(
1669 'Expected one of \'%s\', got %r' %
1670 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1671 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1672 raise ConfigError(
1673 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1674
1675 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001676 if not isinstance(value, list):
1677 raise ConfigError('Expected list, got %r' % value)
1678 if not value:
1679 raise ConfigError('Expected non-empty command')
1680 for subvalue in value:
1681 if not isinstance(subvalue, basestring):
1682 raise ConfigError('Expected string, got %r' % subvalue)
1683
1684 elif key == 'files':
1685 if not isinstance(value, dict):
1686 raise ConfigError('Expected dict, got %r' % value)
1687 for subkey, subvalue in value.iteritems():
1688 if not isinstance(subkey, basestring):
1689 raise ConfigError('Expected string, got %r' % subkey)
1690 if not isinstance(subvalue, dict):
1691 raise ConfigError('Expected dict, got %r' % subvalue)
1692 for subsubkey, subsubvalue in subvalue.iteritems():
1693 if subsubkey == 'l':
1694 if not isinstance(subsubvalue, basestring):
1695 raise ConfigError('Expected string, got %r' % subsubvalue)
1696 elif subsubkey == 'm':
1697 if not isinstance(subsubvalue, int):
1698 raise ConfigError('Expected int, got %r' % subsubvalue)
1699 elif subsubkey == 'h':
1700 if not is_valid_hash(subsubvalue, algo):
1701 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1702 elif subsubkey == 's':
Marc-Antoine Ruelaab3a622013-11-28 09:47:05 -05001703 if not isinstance(subsubvalue, (int, long)):
1704 raise ConfigError('Expected int or long, got %r' % subsubvalue)
maruel@chromium.org41601642013-09-18 19:40:46 +00001705 else:
1706 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001707 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001708 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001709 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1710 subvalue)
1711 if bool('h' in subvalue) != bool('s' in subvalue):
1712 raise ConfigError(
1713 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1714 subvalue)
1715 if bool('s' in subvalue) == bool('l' in subvalue):
1716 raise ConfigError(
1717 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1718 subvalue)
1719 if bool('l' in subvalue) and bool('m' in subvalue):
1720 raise ConfigError(
1721 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001722 subvalue)
1723
1724 elif key == 'includes':
1725 if not isinstance(value, list):
1726 raise ConfigError('Expected list, got %r' % value)
1727 if not value:
1728 raise ConfigError('Expected non-empty includes list')
1729 for subvalue in value:
1730 if not is_valid_hash(subvalue, algo):
1731 raise ConfigError('Expected sha-1, got %r' % subvalue)
1732
1733 elif key == 'read_only':
Marc-Antoine Ruel7124e392014-01-09 11:49:21 -05001734 if not value in (0, 1, 2):
1735 raise ConfigError('Expected 0, 1 or 2, got %r' % value)
maruel@chromium.org41601642013-09-18 19:40:46 +00001736
1737 elif key == 'relative_cwd':
1738 if not isinstance(value, basestring):
1739 raise ConfigError('Expected string, got %r' % value)
1740
1741 elif key == 'os':
1742 if os_flavor and value != os_flavor:
1743 raise ConfigError(
1744 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1745 (os_flavor, value))
1746
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001747 elif key == 'version':
1748 # Already checked above.
1749 pass
1750
maruel@chromium.org41601642013-09-18 19:40:46 +00001751 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001752 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001753
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001754 # Automatically fix os.path.sep if necessary. While .isolated files are always
1755 # in the the native path format, someone could want to download an .isolated
1756 # tree from another OS.
1757 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1758 if 'files' in data:
1759 data['files'] = dict(
1760 (k.replace(wrong_path_sep, os.path.sep), v)
1761 for k, v in data['files'].iteritems())
1762 for v in data['files'].itervalues():
1763 if 'l' in v:
1764 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1765 if 'relative_cwd' in data:
1766 data['relative_cwd'] = data['relative_cwd'].replace(
1767 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001768 return data
1769
1770
1771class IsolatedFile(object):
1772 """Represents a single parsed .isolated file."""
1773 def __init__(self, obj_hash, algo):
1774 """|obj_hash| is really the sha-1 of the file."""
1775 logging.debug('IsolatedFile(%s)' % obj_hash)
1776 self.obj_hash = obj_hash
1777 self.algo = algo
1778 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1779 # .isolate and all the .isolated files recursively included by it with
1780 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1781 # .isolated file in the hash table, is important, as the later ones are not
1782 # processed until the firsts are retrieved and read.
1783 self.can_fetch = False
1784
1785 # Raw data.
1786 self.data = {}
1787 # A IsolatedFile instance, one per object in self.includes.
1788 self.children = []
1789
1790 # Set once the .isolated file is loaded.
1791 self._is_parsed = False
1792 # Set once the files are fetched.
1793 self.files_fetched = False
1794
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001795 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001796 """Verifies the .isolated file is valid and loads this object with the json
1797 data.
1798 """
1799 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1800 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001801 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001802 self.children = [
1803 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1804 ]
1805 self._is_parsed = True
1806
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001807 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001808 """Adds files in this .isolated file not present in |files| dictionary.
1809
1810 Preemptively request files.
1811
1812 Note that |files| is modified by this function.
1813 """
1814 assert self.can_fetch
1815 if not self._is_parsed or self.files_fetched:
1816 return
1817 logging.debug('fetch_files(%s)' % self.obj_hash)
1818 for filepath, properties in self.data.get('files', {}).iteritems():
1819 # Root isolated has priority on the files being mapped. In particular,
1820 # overriden files must not be fetched.
1821 if filepath not in files:
1822 files[filepath] = properties
1823 if 'h' in properties:
1824 # Preemptively request files.
1825 logging.debug('fetching %s' % filepath)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001826 fetch_queue.add(properties['h'], properties['s'], WorkerPool.MED)
maruel@chromium.org41601642013-09-18 19:40:46 +00001827 self.files_fetched = True
1828
1829
1830class Settings(object):
1831 """Results of a completely parsed .isolated file."""
1832 def __init__(self):
1833 self.command = []
1834 self.files = {}
1835 self.read_only = None
1836 self.relative_cwd = None
1837 # The main .isolated file, a IsolatedFile instance.
1838 self.root = None
1839
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001840 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001841 """Loads the .isolated and all the included .isolated asynchronously.
1842
1843 It enables support for "included" .isolated files. They are processed in
1844 strict order but fetched asynchronously from the cache. This is important so
1845 that a file in an included .isolated file that is overridden by an embedding
1846 .isolated file is not fetched needlessly. The includes are fetched in one
1847 pass and the files are fetched as soon as all the ones on the left-side
1848 of the tree were fetched.
1849
1850 The prioritization is very important here for nested .isolated files.
1851 'includes' have the highest priority and the algorithm is optimized for both
1852 deep and wide trees. A deep one is a long link of .isolated files referenced
1853 one at a time by one item in 'includes'. A wide one has a large number of
1854 'includes' in a single .isolated file. 'left' is defined as an included
1855 .isolated file earlier in the 'includes' list. So the order of the elements
1856 in 'includes' is important.
1857 """
1858 self.root = IsolatedFile(root_isolated_hash, algo)
1859
1860 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1861 pending = {}
1862 # Set of hashes of already retrieved items to refuse recursive includes.
1863 seen = set()
1864
1865 def retrieve(isolated_file):
1866 h = isolated_file.obj_hash
1867 if h in seen:
1868 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1869 assert h not in pending
1870 seen.add(h)
1871 pending[h] = isolated_file
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001872 fetch_queue.add(h, priority=WorkerPool.HIGH)
maruel@chromium.org41601642013-09-18 19:40:46 +00001873
1874 retrieve(self.root)
1875
1876 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001877 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001878 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001879 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001880 if item_hash == root_isolated_hash:
1881 # It's the root item.
1882 item.can_fetch = True
1883
1884 for new_child in item.children:
1885 retrieve(new_child)
1886
1887 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001888 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001889
1890 def check(n):
1891 return all(check(x) for x in n.children) and n.files_fetched
1892 assert check(self.root)
1893
1894 self.relative_cwd = self.relative_cwd or ''
maruel@chromium.org41601642013-09-18 19:40:46 +00001895
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001896 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001897 if node.can_fetch:
1898 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001899 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001900 will_break = False
1901 for i in node.children:
1902 if not i.can_fetch:
1903 if will_break:
1904 break
1905 # Automatically mark the first one as fetcheable.
1906 i.can_fetch = True
1907 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001908 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001909
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001910 def _update_self(self, fetch_queue, node):
1911 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001912 # Grabs properties.
1913 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001914 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001915 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001916 if self.command:
1917 self.command[0] = self.command[0].replace('/', os.path.sep)
1918 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001919 if self.read_only is None and node.data.get('read_only') is not None:
1920 self.read_only = node.data['read_only']
1921 if (self.relative_cwd is None and
1922 node.data.get('relative_cwd') is not None):
1923 self.relative_cwd = node.data['relative_cwd']
1924
1925
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001926def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001927 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001928 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001929
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001930 Arguments:
1931 isolated_hash: hash of the root *.isolated file.
1932 storage: Storage class that communicates with isolate storage.
1933 cache: LocalCache class that knows how to store and map files locally.
1934 algo: hash algorithm to use.
1935 outdir: Output directory to map file tree to.
1936 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1937 require_command: Ensure *.isolated specifies a command to run.
1938
1939 Returns:
1940 Settings object that holds details about loaded *.isolated file.
1941 """
1942 with cache:
1943 fetch_queue = FetchQueue(storage, cache)
1944 settings = Settings()
1945
1946 with tools.Profiler('GetIsolateds'):
1947 # Optionally support local files by manually adding them to cache.
1948 if not is_valid_hash(isolated_hash, algo):
1949 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1950
1951 # Load all *.isolated and start loading rest of the files.
1952 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001953 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001954 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1955 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001956 raise ConfigError('No command to run')
1957
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001958 with tools.Profiler('GetRest'):
1959 # Create file system hierarchy.
1960 if not os.path.isdir(outdir):
1961 os.makedirs(outdir)
1962 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001963 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001964
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001965 # Ensure working directory exists.
1966 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1967 if not os.path.isdir(cwd):
1968 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001969
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001970 # Multimap: digest -> list of pairs (path, props).
1971 remaining = {}
1972 for filepath, props in settings.files.iteritems():
1973 if 'h' in props:
1974 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001975
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001976 # Now block on the remaining files to be downloaded and mapped.
1977 logging.info('Retrieving remaining files (%d of them)...',
1978 fetch_queue.pending_count)
1979 last_update = time.time()
1980 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1981 while remaining:
1982 detector.ping()
1983
1984 # Wait for any item to finish fetching to cache.
1985 digest = fetch_queue.wait(remaining)
1986
1987 # Link corresponding files to a fetched item in cache.
1988 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001989 cache.hardlink(
1990 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001991
1992 # Report progress.
1993 duration = time.time() - last_update
1994 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1995 msg = '%d files remaining...' % len(remaining)
1996 print msg
1997 logging.info(msg)
1998 last_update = time.time()
1999
2000 # Cache could evict some items we just tried to fetch, it's a fatal error.
2001 if not fetch_queue.verify_all_cached():
2002 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002003 return settings
2004
2005
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002006def directory_to_metadata(root, algo, blacklist):
2007 """Returns the FileItem list and .isolated metadata for a directory."""
2008 root = file_path.get_native_path_case(root)
2009 metadata = dict(
2010 (relpath, process_input(
2011 os.path.join(root, relpath), {}, False, sys.platform, algo))
2012 for relpath in expand_directory_and_symlink(
2013 root, './', blacklist, True)
2014 )
2015 for v in metadata.itervalues():
2016 v.pop('t')
2017 items = [
2018 FileItem(
2019 path=os.path.join(root, relpath),
2020 digest=meta['h'],
2021 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002022 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002023 for relpath, meta in metadata.iteritems() if 'h' in meta
2024 ]
2025 return items, metadata
2026
2027
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002028def archive_files_to_storage(storage, algo, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002029 """Stores every entries and returns the relevant data.
2030
2031 Arguments:
2032 storage: a Storage object that communicates with the remote object store.
2033 algo: an hashlib class to hash content. Usually hashlib.sha1.
2034 files: list of file paths to upload. If a directory is specified, a
2035 .isolated file is created and its hash is returned.
2036 blacklist: function that returns True if a file should be omitted.
2037 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002038 assert all(isinstance(i, unicode) for i in files), files
2039 if len(files) != len(set(map(os.path.abspath, files))):
2040 raise Error('Duplicate entries found.')
2041
2042 results = []
2043 # The temporary directory is only created as needed.
2044 tempdir = None
2045 try:
2046 # TODO(maruel): Yield the files to a worker thread.
2047 items_to_upload = []
2048 for f in files:
2049 try:
2050 filepath = os.path.abspath(f)
2051 if os.path.isdir(filepath):
2052 # Uploading a whole directory.
2053 items, metadata = directory_to_metadata(filepath, algo, blacklist)
2054
2055 # Create the .isolated file.
2056 if not tempdir:
2057 tempdir = tempfile.mkdtemp(prefix='isolateserver')
2058 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
2059 os.close(handle)
2060 data = {
2061 'algo': SUPPORTED_ALGOS_REVERSE[algo],
2062 'files': metadata,
Marc-Antoine Ruel1c1edd62013-12-06 09:13:13 -05002063 'version': ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002064 }
2065 save_isolated(isolated, data)
2066 h = hash_file(isolated, algo)
2067 items_to_upload.extend(items)
2068 items_to_upload.append(
2069 FileItem(
2070 path=isolated,
2071 digest=h,
2072 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002073 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002074 results.append((h, f))
2075
2076 elif os.path.isfile(filepath):
2077 h = hash_file(filepath, algo)
2078 items_to_upload.append(
2079 FileItem(
2080 path=filepath,
2081 digest=h,
2082 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002083 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002084 results.append((h, f))
2085 else:
2086 raise Error('%s is neither a file or directory.' % f)
2087 except OSError:
2088 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002089 # Technically we would care about which files were uploaded but we don't
2090 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002091 _uploaded_files = storage.upload_items(items_to_upload)
2092 return results
2093 finally:
2094 if tempdir:
2095 shutil.rmtree(tempdir)
2096
2097
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002098def archive(out, namespace, files, blacklist):
2099 if files == ['-']:
2100 files = sys.stdin.readlines()
2101
2102 if not files:
2103 raise Error('Nothing to upload')
2104
2105 files = [f.decode('utf-8') for f in files]
2106 algo = get_hash_algo(namespace)
2107 blacklist = tools.gen_blacklist(blacklist)
2108 with get_storage(out, namespace) as storage:
2109 results = archive_files_to_storage(storage, algo, files, blacklist)
2110 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2111
2112
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002113@subcommand.usage('<file1..fileN> or - to read from stdin')
2114def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002115 """Archives data to the server.
2116
2117 If a directory is specified, a .isolated file is created the whole directory
2118 is uploaded. Then this .isolated file can be included in another one to run
2119 commands.
2120
2121 The commands output each file that was processed with its content hash. For
2122 directories, the .isolated generated for the directory is listed as the
2123 directory entry itself.
2124 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002125 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002126 parser.add_option(
2127 '--blacklist',
2128 action='append', default=list(DEFAULT_BLACKLIST),
2129 help='List of regexp to use as blacklist filter when uploading '
2130 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002131 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002132 process_isolate_server_options(parser, options)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002133 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002134 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002135 except Error as e:
2136 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002137 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002138
2139
2140def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002141 """Download data from the server.
2142
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002143 It can either download individual files or a complete tree from a .isolated
2144 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002145 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002146 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002147 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002148 '-i', '--isolated', metavar='HASH',
2149 help='hash of an isolated file, .isolated file content is discarded, use '
2150 '--file if you need it')
2151 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002152 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2153 help='hash and destination of a file, can be used multiple times')
2154 parser.add_option(
2155 '-t', '--target', metavar='DIR', default=os.getcwd(),
2156 help='destination directory')
2157 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002158 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002159 if args:
2160 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002161 if bool(options.isolated) == bool(options.file):
2162 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002163
2164 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002165
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002166 remote = options.isolate_server or options.indir
2167 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002168 # Fetching individual files.
2169 if options.file:
2170 channel = threading_utils.TaskChannel()
2171 pending = {}
2172 for digest, dest in options.file:
2173 pending[digest] = dest
2174 storage.async_fetch(
2175 channel,
2176 WorkerPool.MED,
2177 digest,
2178 UNKNOWN_FILE_SIZE,
2179 functools.partial(file_write, os.path.join(options.target, dest)))
2180 while pending:
2181 fetched = channel.pull()
2182 dest = pending.pop(fetched)
2183 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002184
Vadim Shtayura3172be52013-12-03 12:49:05 -08002185 # Fetching whole isolated tree.
2186 if options.isolated:
2187 settings = fetch_isolated(
2188 isolated_hash=options.isolated,
2189 storage=storage,
2190 cache=MemoryCache(),
2191 algo=get_hash_algo(options.namespace),
2192 outdir=options.target,
2193 os_flavor=None,
2194 require_command=False)
2195 rel = os.path.join(options.target, settings.relative_cwd)
2196 print('To run this test please run from the directory %s:' %
2197 os.path.join(options.target, rel))
2198 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002199
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002200 return 0
2201
2202
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002203@subcommand.usage('<file1..fileN> or - to read from stdin')
2204def CMDhashtable(parser, args):
2205 """Archives data to a hashtable on the file system.
2206
2207 If a directory is specified, a .isolated file is created the whole directory
2208 is uploaded. Then this .isolated file can be included in another one to run
2209 commands.
2210
2211 The commands output each file that was processed with its content hash. For
2212 directories, the .isolated generated for the directory is listed as the
2213 directory entry itself.
2214 """
2215 add_outdir_options(parser)
2216 parser.add_option(
2217 '--blacklist',
2218 action='append', default=list(DEFAULT_BLACKLIST),
2219 help='List of regexp to use as blacklist filter when uploading '
2220 'directories')
2221 options, files = parser.parse_args(args)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002222 process_outdir_options(parser, options, os.getcwd())
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002223 try:
2224 # Do not compress files when archiving to the file system.
2225 archive(options.outdir, 'default', files, options.blacklist)
2226 except Error as e:
2227 parser.error(e.args[0])
2228 return 0
2229
2230
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002231def add_isolate_server_options(parser, add_indir):
2232 """Adds --isolate-server and --namespace options to parser.
2233
2234 Includes --indir if desired.
2235 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002236 parser.add_option(
2237 '-I', '--isolate-server',
2238 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002239 help='URL of the Isolate Server to use. Defaults to the environment '
2240 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2241 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002242 parser.add_option(
2243 '--namespace', default='default-gzip',
2244 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002245 if add_indir:
2246 parser.add_option(
2247 '--indir', metavar='DIR',
2248 help='Directory used to store the hashtable instead of using an '
2249 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002250
2251
2252def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002253 """Processes the --isolate-server and --indir options and aborts if neither is
2254 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002255 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002256 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002257 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002258 if not has_indir:
2259 parser.error('--isolate-server is required.')
2260 elif not options.indir:
2261 parser.error('Use one of --indir or --isolate-server.')
2262 else:
2263 if has_indir and options.indir:
2264 parser.error('Use only one of --indir or --isolate-server.')
2265
2266 if options.isolate_server:
2267 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002268 if parts.query:
2269 parser.error('--isolate-server doesn\'t support query parameter.')
2270 if parts.fragment:
2271 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002272 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2273 # what is desired here.
2274 new = list(parts)
2275 if not new[1] and new[2]:
2276 new[1] = new[2].rstrip('/')
2277 new[2] = ''
2278 new[2] = new[2].rstrip('/')
2279 options.isolate_server = urlparse.urlunparse(new)
2280 return
2281
2282 if file_path.is_url(options.indir):
2283 parser.error('Can\'t use an URL for --indir.')
2284 options.indir = unicode(options.indir).replace('/', os.path.sep)
2285 options.indir = os.path.abspath(
2286 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2287 if not os.path.isdir(options.indir):
2288 parser.error('Path given to --indir must exist.')
2289
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002290
2291
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002292def add_outdir_options(parser):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002293 """Adds --outdir, which is orthogonal to --isolate-server.
2294
2295 Note: On upload, separate commands are used between 'archive' and 'hashtable'.
2296 On 'download', the same command can download from either an isolate server or
2297 a file system.
2298 """
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002299 parser.add_option(
2300 '-o', '--outdir', metavar='DIR',
2301 help='Directory used to recreate the tree.')
2302
2303
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002304def process_outdir_options(parser, options, cwd):
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002305 if not options.outdir:
2306 parser.error('--outdir is required.')
2307 if file_path.is_url(options.outdir):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002308 parser.error('Can\'t use an URL for --outdir.')
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002309 options.outdir = unicode(options.outdir).replace('/', os.path.sep)
2310 # outdir doesn't need native path case since tracing is never done from there.
2311 options.outdir = os.path.abspath(
2312 os.path.normpath(os.path.join(cwd, options.outdir)))
2313 # In theory, we'd create the directory outdir right away. Defer doing it in
2314 # case there's errors in the command line.
2315
2316
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002317class OptionParserIsolateServer(tools.OptionParserWithLogging):
2318 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002319 tools.OptionParserWithLogging.__init__(
2320 self,
2321 version=__version__,
2322 prog=os.path.basename(sys.modules[__name__].__file__),
2323 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002324 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002325
2326 def parse_args(self, *args, **kwargs):
2327 options, args = tools.OptionParserWithLogging.parse_args(
2328 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002329 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002330 return options, args
2331
2332
2333def main(args):
2334 dispatcher = subcommand.CommandDispatcher(__name__)
2335 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002336 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00002337 except Exception as e:
2338 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002339 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002340
2341
2342if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002343 fix_encoding.fix_encoding()
2344 tools.disable_buffering()
2345 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002346 sys.exit(main(sys.argv[1:]))