blob: a3e4c380bec533e103e124be6b545ed588b2d324 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000020import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000022from third_party import colorama
23from third_party.depot_tools import fix_encoding
24from third_party.depot_tools import subcommand
25
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000026from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000027from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000028from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000029
30
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000031# Version of isolate protocol passed to the server in /handshake request.
32ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000034
35# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000036# All files are sorted by likelihood of a change in the file content
37# (currently file size is used to estimate this: larger the file -> larger the
38# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000040# and so on. Numbers here is a trade-off; the more per request, the lower the
41# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
42# larger values cause longer lookups, increasing the initial latency to start
43# uploading, which is especially an issue for large files. This value is
44# optimized for the "few thousands files to look up with minimal number of large
45# files missing" case.
46ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000047
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000048
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000049# A list of already compressed extension types that should not receive any
50# compression before being uploaded.
51ALREADY_COMPRESSED_TYPES = [
52 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
53 'wav', 'zip'
54]
55
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000056
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000057# The file size to be used when we don't know the correct file size,
58# generally used for .isolated files.
59UNKNOWN_FILE_SIZE = None
60
61
62# The size of each chunk to read when downloading and unzipping files.
63ZIPPED_FILE_CHUNK = 16 * 1024
64
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000065# Chunk size to use when doing disk I/O.
66DISK_FILE_CHUNK = 1024 * 1024
67
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000068# Chunk size to use when reading from network stream.
69NET_IO_FILE_CHUNK = 16 * 1024
70
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000071
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000072# Read timeout in seconds for downloads from isolate storage. If there's no
73# response from the server within this timeout whole download will be aborted.
74DOWNLOAD_READ_TIMEOUT = 60
75
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000076# Maximum expected delay (in seconds) between successive file fetches
77# in run_tha_test. If it takes longer than that, a deadlock might be happening
78# and all stack frames for all threads are dumped to log.
79DEADLOCK_TIMEOUT = 5 * 60
80
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
maruel@chromium.org385d73d2013-09-19 18:33:21 +000088# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
89# specify the names here.
90SUPPORTED_ALGOS = {
91 'md5': hashlib.md5,
92 'sha-1': hashlib.sha1,
93 'sha-512': hashlib.sha512,
94}
95
96
97# Used for serialization.
98SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
99
100
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500101DEFAULT_BLACKLIST = (
102 # Temporary vim or python files.
103 r'^.+\.(?:pyc|swp)$',
104 # .git or .svn directory.
105 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
106)
107
108
109# Chromium-specific.
110DEFAULT_BLACKLIST += (
111 r'^.+\.(?:run_test_cases)$',
112 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
113)
114
115
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000116class ConfigError(ValueError):
117 """Generic failure to load a .isolated file."""
118 pass
119
120
121class MappingError(OSError):
122 """Failed to recreate the tree."""
123 pass
124
125
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000126def is_valid_hash(value, algo):
127 """Returns if the value is a valid hash for the corresponding algorithm."""
128 size = 2 * algo().digest_size
129 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
130
131
132def hash_file(filepath, algo):
133 """Calculates the hash of a file without reading it all in memory at once.
134
135 |algo| should be one of hashlib hashing algorithm.
136 """
137 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000138 with open(filepath, 'rb') as f:
139 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000141 if not chunk:
142 break
143 digest.update(chunk)
144 return digest.hexdigest()
145
146
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000147def stream_read(stream, chunk_size):
148 """Reads chunks from |stream| and yields them."""
149 while True:
150 data = stream.read(chunk_size)
151 if not data:
152 break
153 yield data
154
155
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000156def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
157 """Yields file content in chunks of given |chunk_size|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000158 with open(filepath, 'rb') as f:
159 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000160 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000161 if not data:
162 break
163 yield data
164
165
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000166def file_write(filepath, content_generator):
167 """Writes file content as generated by content_generator.
168
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000169 Creates the intermediary directory as needed.
170
171 Returns the number of bytes written.
172
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000173 Meant to be mocked out in unit tests.
174 """
175 filedir = os.path.dirname(filepath)
176 if not os.path.isdir(filedir):
177 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000178 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000179 with open(filepath, 'wb') as f:
180 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000181 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000182 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000183 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000184
185
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000186def zip_compress(content_generator, level=7):
187 """Reads chunks from |content_generator| and yields zip compressed chunks."""
188 compressor = zlib.compressobj(level)
189 for chunk in content_generator:
190 compressed = compressor.compress(chunk)
191 if compressed:
192 yield compressed
193 tail = compressor.flush(zlib.Z_FINISH)
194 if tail:
195 yield tail
196
197
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000198def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
199 """Reads zipped data from |content_generator| and yields decompressed data.
200
201 Decompresses data in small chunks (no larger than |chunk_size|) so that
202 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
203
204 Raises IOError if data is corrupted or incomplete.
205 """
206 decompressor = zlib.decompressobj()
207 compressed_size = 0
208 try:
209 for chunk in content_generator:
210 compressed_size += len(chunk)
211 data = decompressor.decompress(chunk, chunk_size)
212 if data:
213 yield data
214 while decompressor.unconsumed_tail:
215 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
216 if data:
217 yield data
218 tail = decompressor.flush()
219 if tail:
220 yield tail
221 except zlib.error as e:
222 raise IOError(
223 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
224 # Ensure all data was read and decompressed.
225 if decompressor.unused_data or decompressor.unconsumed_tail:
226 raise IOError('Not all data was decompressed')
227
228
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000229def get_zip_compression_level(filename):
230 """Given a filename calculates the ideal zip compression level to use."""
231 file_ext = os.path.splitext(filename)[1].lower()
232 # TODO(csharp): Profile to find what compression level works best.
233 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
234
235
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000236def create_directories(base_directory, files):
237 """Creates the directory structure needed by the given list of files."""
238 logging.debug('create_directories(%s, %d)', base_directory, len(files))
239 # Creates the tree of directories to create.
240 directories = set(os.path.dirname(f) for f in files)
241 for item in list(directories):
242 while item:
243 directories.add(item)
244 item = os.path.dirname(item)
245 for d in sorted(directories):
246 if d:
247 os.mkdir(os.path.join(base_directory, d))
248
249
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500250def create_symlinks(base_directory, files):
251 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000252 for filepath, properties in files:
253 if 'l' not in properties:
254 continue
255 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500256 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000257 logging.warning('Ignoring symlink %s', filepath)
258 continue
259 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500260 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000261 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000262
263
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000264def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000265 """Determines if the given files appears valid.
266
267 Currently it just checks the file's size.
268 """
269 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000270 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000271 actual_size = os.stat(filepath).st_size
272 if size != actual_size:
273 logging.warning(
274 'Found invalid item %s; %d != %d',
275 os.path.basename(filepath), actual_size, size)
276 return False
277 return True
278
279
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000280class WorkerPool(threading_utils.AutoRetryThreadPool):
281 """Thread pool that automatically retries on IOError and runs a preconfigured
282 function.
283 """
284 # Initial and maximum number of worker threads.
285 INITIAL_WORKERS = 2
286 MAX_WORKERS = 16
287 RETRIES = 5
288
289 def __init__(self):
290 super(WorkerPool, self).__init__(
291 [IOError],
292 self.RETRIES,
293 self.INITIAL_WORKERS,
294 self.MAX_WORKERS,
295 0,
296 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000297
298
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000299class Item(object):
300 """An item to push to Storage.
301
302 It starts its life in a main thread, travels to 'contains' thread, then to
303 'push' thread and then finally back to the main thread.
304
305 It is never used concurrently from multiple threads.
306 """
307
308 def __init__(self, digest, size, is_isolated=False):
309 self.digest = digest
310 self.size = size
311 self.is_isolated = is_isolated
312 self.compression_level = 6
313 self.push_state = None
314
315 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000316 """Iterable with content of this item in chunks of given size.
317
318 Arguments:
319 chunk_size: preferred size of the chunk to produce, may be ignored.
320 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000321 raise NotImplementedError()
322
323
324class FileItem(Item):
325 """A file to push to Storage."""
326
327 def __init__(self, path, digest, size, is_isolated):
328 super(FileItem, self).__init__(digest, size, is_isolated)
329 self.path = path
330 self.compression_level = get_zip_compression_level(path)
331
332 def content(self, chunk_size):
333 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000334
335
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000336class BufferItem(Item):
337 """A byte buffer to push to Storage."""
338
339 def __init__(self, buf, algo, is_isolated=False):
340 super(BufferItem, self).__init__(
341 algo(buf).hexdigest(), len(buf), is_isolated)
342 self.buffer = buf
343
344 def content(self, _chunk_size):
345 return [self.buffer]
346
347
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000348class Storage(object):
349 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000350
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000351 def __init__(self, storage_api, use_zip):
352 self.use_zip = use_zip
353 self._storage_api = storage_api
354 self._cpu_thread_pool = None
355 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000356
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000357 @property
358 def cpu_thread_pool(self):
359 """ThreadPool for CPU-bound tasks like zipping."""
360 if self._cpu_thread_pool is None:
361 self._cpu_thread_pool = threading_utils.ThreadPool(
362 2, max(threading_utils.num_processors(), 2), 0, 'zip')
363 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000364
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000365 @property
366 def net_thread_pool(self):
367 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
368 if self._net_thread_pool is None:
369 self._net_thread_pool = WorkerPool()
370 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000371
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000372 def close(self):
373 """Waits for all pending tasks to finish."""
374 if self._cpu_thread_pool:
375 self._cpu_thread_pool.join()
376 self._cpu_thread_pool.close()
377 self._cpu_thread_pool = None
378 if self._net_thread_pool:
379 self._net_thread_pool.join()
380 self._net_thread_pool.close()
381 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000382
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383 def __enter__(self):
384 """Context manager interface."""
385 return self
386
387 def __exit__(self, _exc_type, _exc_value, _traceback):
388 """Context manager interface."""
389 self.close()
390 return False
391
392 def upload_tree(self, indir, infiles):
393 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000394
395 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000396 indir: root directory the infiles are based in.
397 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000398
399 Returns:
400 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000401 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000402 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
403
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000404 # Convert |indir| + |infiles| into a list of FileItem objects.
405 # Filter out symlinks, since they are not represented by items on isolate
406 # server side.
407 items = [
408 FileItem(
409 path=os.path.join(indir, filepath),
410 digest=metadata['h'],
411 size=metadata['s'],
412 is_isolated=metadata.get('priority') == '0')
413 for filepath, metadata in infiles.iteritems()
414 if 'l' not in metadata
415 ]
416
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000417 return self.upload_items(items)
418
419 def upload_items(self, items):
420 """Uploads bunch of items to the isolate server.
421
422 Will upload only items that are missing.
423
424 Arguments:
425 items: list of Item instances that represents data to upload.
426
427 Returns:
428 List of items that were uploaded. All other items are already there.
429 """
430 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
431 # used by swarming.py. There's no need to spawn multiple threads and try to
432 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
433 # 'push' should be performed sequentially in the context of current thread.
434
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000435 # For each digest keep only first Item that matches it. All other items
436 # are just indistinguishable copies from the point of view of isolate
437 # server (it doesn't care about paths at all, only content and digests).
438 seen = {}
439 duplicates = 0
440 for item in items:
441 if seen.setdefault(item.digest, item) is not item:
442 duplicates += 1
443 items = seen.values()
444 if duplicates:
445 logging.info('Skipped %d duplicated files', duplicates)
446
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000447 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000448 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000449 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000450 for missing_item in self.get_missing_items(items):
451 missing.add(missing_item)
452 self.async_push(
453 channel,
454 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
455 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000457 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000458 # No need to spawn deadlock detector thread if there's nothing to upload.
459 if missing:
460 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
461 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000462 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 detector.ping()
464 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000465 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000466 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000467 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000468 logging.info('All files are uploaded')
469
470 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000471 total = len(items)
472 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000473 logging.info(
474 'Total: %6d, %9.1fkb',
475 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000476 total_size / 1024.)
477 cache_hit = set(items) - missing
478 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000479 logging.info(
480 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
481 len(cache_hit),
482 cache_hit_size / 1024.,
483 len(cache_hit) * 100. / total,
484 cache_hit_size * 100. / total_size if total_size else 0)
485 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000486 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 logging.info(
488 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
489 len(cache_miss),
490 cache_miss_size / 1024.,
491 len(cache_miss) * 100. / total,
492 cache_miss_size * 100. / total_size if total_size else 0)
493
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000494 return uploaded
495
496 def get_fetch_url(self, digest):
497 """Returns an URL that can be used to fetch an item with given digest.
498
499 Arguments:
500 digest: hex digest of item to fetch.
501
502 Returns:
503 An URL or None if underlying protocol doesn't support this.
504 """
505 return self._storage_api.get_fetch_url(digest)
506
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000507 def async_push(self, channel, priority, item):
508 """Starts asynchronous push to the server in a parallel thread.
509
510 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000511 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000512 priority: thread pool task priority for the push.
513 item: item to upload as instance of Item class.
514 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000515 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000516 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000517 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000518 return item
519
520 # If zipping is not required, just start a push task.
521 if not self.use_zip:
522 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000523 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000524 return
525
526 # If zipping is enabled, zip in a separate thread.
527 def zip_and_push():
528 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
529 # content right here. It will block until all file is zipped.
530 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000531 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
532 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 data = ''.join(stream)
534 except Exception as exc:
535 logging.error('Failed to zip \'%s\': %s', item, exc)
536 channel.send_exception(exc)
537 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000538 self.net_thread_pool.add_task_with_channel(
539 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000540 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000541
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000542 def async_fetch(self, channel, priority, digest, size, sink):
543 """Starts asynchronous fetch from the server in a parallel thread.
544
545 Arguments:
546 channel: TaskChannel that receives back |digest| when download ends.
547 priority: thread pool task priority for the fetch.
548 digest: hex digest of an item to download.
549 size: expected size of the item (after decompression).
550 sink: function that will be called as sink(generator).
551 """
552 def fetch():
553 try:
554 # Prepare reading pipeline.
555 stream = self._storage_api.fetch(digest)
556 if self.use_zip:
557 stream = zip_decompress(stream, DISK_FILE_CHUNK)
558 # Run |stream| through verifier that will assert its size.
559 verifier = FetchStreamVerifier(stream, size)
560 # Verified stream goes to |sink|.
561 sink(verifier.run())
562 except Exception as err:
563 logging.warning('Failed to fetch %s: %s', digest, err)
564 raise
565 return digest
566
567 # Don't bother with zip_thread_pool for decompression. Decompression is
568 # really fast and most probably IO bound anyway.
569 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
570
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000571 def get_missing_items(self, items):
572 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000573
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000574 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000575
576 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000577 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000578
579 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000581 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000582 channel = threading_utils.TaskChannel()
583 pending = 0
584 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000585 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000586 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
587 self._storage_api.contains, batch)
588 pending += 1
589 # Yield results as they come in.
590 for _ in xrange(pending):
591 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000592 yield missing
593
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000595 def batch_items_for_check(items):
596 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000597
598 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000600
601 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000602 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000603
604 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000605 Batches of items to query for existence in a single operation,
606 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000607 """
608 batch_count = 0
609 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
610 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000611 for item in sorted(items, key=lambda x: x.size, reverse=True):
612 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613 if len(next_queries) == batch_size_limit:
614 yield next_queries
615 next_queries = []
616 batch_count += 1
617 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
618 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
619 if next_queries:
620 yield next_queries
621
622
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000623class FetchQueue(object):
624 """Fetches items from Storage and places them into LocalCache.
625
626 It manages multiple concurrent fetch operations. Acts as a bridge between
627 Storage and LocalCache so that Storage and LocalCache don't depend on each
628 other at all.
629 """
630
631 def __init__(self, storage, cache):
632 self.storage = storage
633 self.cache = cache
634 self._channel = threading_utils.TaskChannel()
635 self._pending = set()
636 self._accessed = set()
637 self._fetched = cache.cached_set()
638
639 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
640 """Starts asynchronous fetch of item |digest|."""
641 # Fetching it now?
642 if digest in self._pending:
643 return
644
645 # Mark this file as in use, verify_all_cached will later ensure it is still
646 # in cache.
647 self._accessed.add(digest)
648
649 # Already fetched? Notify cache to update item's LRU position.
650 if digest in self._fetched:
651 # 'touch' returns True if item is in cache and not corrupted.
652 if self.cache.touch(digest, size):
653 return
654 # Item is corrupted, remove it from cache and fetch it again.
655 self._fetched.remove(digest)
656 self.cache.evict(digest)
657
658 # TODO(maruel): It should look at the free disk space, the current cache
659 # size and the size of the new item on every new item:
660 # - Trim the cache as more entries are listed when free disk space is low,
661 # otherwise if the amount of data downloaded during the run > free disk
662 # space, it'll crash.
663 # - Make sure there's enough free disk space to fit all dependencies of
664 # this run! If not, abort early.
665
666 # Start fetching.
667 self._pending.add(digest)
668 self.storage.async_fetch(
669 self._channel, priority, digest, size,
670 functools.partial(self.cache.write, digest))
671
672 def wait(self, digests):
673 """Starts a loop that waits for at least one of |digests| to be retrieved.
674
675 Returns the first digest retrieved.
676 """
677 # Flush any already fetched items.
678 for digest in digests:
679 if digest in self._fetched:
680 return digest
681
682 # Ensure all requested items are being fetched now.
683 assert all(digest in self._pending for digest in digests), (
684 digests, self._pending)
685
686 # Wait for some requested item to finish fetching.
687 while self._pending:
688 digest = self._channel.pull()
689 self._pending.remove(digest)
690 self._fetched.add(digest)
691 if digest in digests:
692 return digest
693
694 # Should never reach this point due to assert above.
695 raise RuntimeError('Impossible state')
696
697 def inject_local_file(self, path, algo):
698 """Adds local file to the cache as if it was fetched from storage."""
699 with open(path, 'rb') as f:
700 data = f.read()
701 digest = algo(data).hexdigest()
702 self.cache.write(digest, [data])
703 self._fetched.add(digest)
704 return digest
705
706 @property
707 def pending_count(self):
708 """Returns number of items to be fetched."""
709 return len(self._pending)
710
711 def verify_all_cached(self):
712 """True if all accessed items are in cache."""
713 return self._accessed.issubset(self.cache.cached_set())
714
715
716class FetchStreamVerifier(object):
717 """Verifies that fetched file is valid before passing it to the LocalCache."""
718
719 def __init__(self, stream, expected_size):
720 self.stream = stream
721 self.expected_size = expected_size
722 self.current_size = 0
723
724 def run(self):
725 """Generator that yields same items as |stream|.
726
727 Verifies |stream| is complete before yielding a last chunk to consumer.
728
729 Also wraps IOError produced by consumer into MappingError exceptions since
730 otherwise Storage will retry fetch on unrelated local cache errors.
731 """
732 # Read one chunk ahead, keep it in |stored|.
733 # That way a complete stream can be verified before pushing last chunk
734 # to consumer.
735 stored = None
736 for chunk in self.stream:
737 assert chunk is not None
738 if stored is not None:
739 self._inspect_chunk(stored, is_last=False)
740 try:
741 yield stored
742 except IOError as exc:
743 raise MappingError('Failed to store an item in cache: %s' % exc)
744 stored = chunk
745 if stored is not None:
746 self._inspect_chunk(stored, is_last=True)
747 try:
748 yield stored
749 except IOError as exc:
750 raise MappingError('Failed to store an item in cache: %s' % exc)
751
752 def _inspect_chunk(self, chunk, is_last):
753 """Called for each fetched chunk before passing it to consumer."""
754 self.current_size += len(chunk)
755 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
756 (self.expected_size != self.current_size)):
757 raise IOError('Incorrect file size: expected %d, got %d' % (
758 self.expected_size, self.current_size))
759
760
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000761class StorageApi(object):
762 """Interface for classes that implement low-level storage operations."""
763
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000764 def get_fetch_url(self, digest):
765 """Returns an URL that can be used to fetch an item with given digest.
766
767 Arguments:
768 digest: hex digest of item to fetch.
769
770 Returns:
771 An URL or None if the protocol doesn't support this.
772 """
773 raise NotImplementedError()
774
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000775 def fetch(self, digest):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000776 """Fetches an object and yields its content.
777
778 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000779 digest: hash digest of item to download.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000780
781 Yields:
782 Chunks of downloaded item (as str objects).
783 """
784 raise NotImplementedError()
785
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000786 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000787 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000788
789 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000790 item: Item object that holds information about an item being pushed.
791 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000792
793 Returns:
794 None.
795 """
796 raise NotImplementedError()
797
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000798 def contains(self, items):
799 """Checks for existence of given |items| on the server.
800
801 Mutates |items| by assigning opaque implement specific object to Item's
802 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000803
804 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000805 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000806
807 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000808 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000809 """
810 raise NotImplementedError()
811
812
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000813class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000814 """StorageApi implementation that downloads and uploads to Isolate Server.
815
816 It uploads and downloads directly from Google Storage whenever appropriate.
817 """
818
819 class _PushState(object):
820 """State needed to call .push(), to be stored in Item.push_state."""
821 def __init__(self, upload_url, finalize_url):
822 self.upload_url = upload_url
823 self.finalize_url = finalize_url
824 self.uploaded = False
825 self.finalized = False
826
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000827 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000828 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000829 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000830 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000831 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000832 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000833 self._server_caps = None
834
835 @staticmethod
836 def _generate_handshake_request():
837 """Returns a dict to be sent as handshake request body."""
838 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
839 return {
840 'client_app_version': __version__,
841 'fetcher': True,
842 'protocol_version': ISOLATE_PROTOCOL_VERSION,
843 'pusher': True,
844 }
845
846 @staticmethod
847 def _validate_handshake_response(caps):
848 """Validates and normalizes handshake response."""
849 logging.info('Protocol version: %s', caps['protocol_version'])
850 logging.info('Server version: %s', caps['server_app_version'])
851 if caps.get('error'):
852 raise MappingError(caps['error'])
853 if not caps['access_token']:
854 raise ValueError('access_token is missing')
855 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000856
857 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000858 def _server_capabilities(self):
859 """Performs handshake with the server if not yet done.
860
861 Returns:
862 Server capabilities dictionary as returned by /handshake endpoint.
863
864 Raises:
865 MappingError if server rejects the handshake.
866 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000867 # TODO(maruel): Make this request much earlier asynchronously while the
868 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000869 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000870 if self._server_caps is None:
871 request_body = json.dumps(
872 self._generate_handshake_request(), separators=(',', ':'))
873 response = net.url_read(
874 url=self.base_url + '/content-gs/handshake',
875 data=request_body,
876 content_type='application/json',
877 method='POST')
878 if response is None:
879 raise MappingError('Failed to perform handshake.')
880 try:
881 caps = json.loads(response)
882 if not isinstance(caps, dict):
883 raise ValueError('Expecting JSON dict')
884 self._server_caps = self._validate_handshake_response(caps)
885 except (ValueError, KeyError, TypeError) as exc:
886 # KeyError exception has very confusing str conversion: it's just a
887 # missing key value and nothing else. So print exception class name
888 # as well.
889 raise MappingError('Invalid handshake response (%s): %s' % (
890 exc.__class__.__name__, exc))
891 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000892
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000893 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000894 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000895 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000896 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000897
898 def fetch(self, digest):
899 source_url = self.get_fetch_url(digest)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000900 logging.debug('download_file(%s)', source_url)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000901
902 # Because the app engine DB is only eventually consistent, retry 404 errors
903 # because the file might just not be visible yet (even though it has been
904 # uploaded).
905 connection = net.url_open(
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000906 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000907 if not connection:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000908 raise IOError('Unable to open connection to %s' % source_url)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000909 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000910
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000911 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000912 assert isinstance(item, Item)
913 assert isinstance(item.push_state, IsolateServer._PushState)
914 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000915
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000916 # TODO(vadimsh): Do not read from |content| generator when retrying push.
917 # If |content| is indeed a generator, it can not be re-winded back
918 # to the beginning of the stream. A retry will find it exhausted. A possible
919 # solution is to wrap |content| generator with some sort of caching
920 # restartable generator. It should be done alongside streaming support
921 # implementation.
922
923 # This push operation may be a retry after failed finalization call below,
924 # no need to reupload contents in that case.
925 if not item.push_state.uploaded:
926 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
927 # upload support is implemented.
928 if isinstance(content, list) and len(content) == 1:
929 content = content[0]
930 else:
931 content = ''.join(content)
932 # PUT file to |upload_url|.
933 response = net.url_read(
934 url=item.push_state.upload_url,
935 data=content,
936 content_type='application/octet-stream',
937 method='PUT')
938 if response is None:
939 raise IOError('Failed to upload a file %s to %s' % (
940 item.digest, item.push_state.upload_url))
941 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000942 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000943 logging.info(
944 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000945
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000946 # Optionally notify the server that it's done.
947 if item.push_state.finalize_url:
948 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
949 # send it to isolated server. That way isolate server can verify that
950 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
951 # stored files).
952 response = net.url_read(
953 url=item.push_state.finalize_url,
954 data='',
955 content_type='application/json',
956 method='POST')
957 if response is None:
958 raise IOError('Failed to finalize an upload of %s' % item.digest)
959 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000960
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000961 def contains(self, items):
962 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000963
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000964 # Request body is a json encoded list of dicts.
965 body = [
966 {
967 'h': item.digest,
968 's': item.size,
969 'i': int(item.is_isolated),
970 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000971 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000972
973 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
974 self.base_url,
975 self.namespace,
976 urllib.quote(self._server_capabilities['access_token']))
977 response_body = net.url_read(
978 url=query_url,
979 data=json.dumps(body, separators=(',', ':')),
980 content_type='application/json',
981 method='POST')
982 if response_body is None:
983 raise MappingError('Failed to execute /pre-upload query')
984
985 # Response body is a list of push_urls (or null if file is already present).
986 try:
987 response = json.loads(response_body)
988 if not isinstance(response, list):
989 raise ValueError('Expecting response with json-encoded list')
990 if len(response) != len(items):
991 raise ValueError(
992 'Incorrect number of items in the list, expected %d, '
993 'but got %d' % (len(items), len(response)))
994 except ValueError as err:
995 raise MappingError(
996 'Invalid response from server: %s, body is %s' % (err, response_body))
997
998 # Pick Items that are missing, attach _PushState to them.
999 missing_items = []
1000 for i, push_urls in enumerate(response):
1001 if push_urls:
1002 assert len(push_urls) == 2, str(push_urls)
1003 item = items[i]
1004 assert item.push_state is None
1005 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
1006 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001007 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001008 len(items), len(items) - len(missing_items))
1009 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001010
1011
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001012class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001013 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001014
1015 The common use case is a NFS/CIFS file server that is mounted locally that is
1016 used to fetch the file on a local partition.
1017 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001018
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001019 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001020 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001021 self.base_path = base_path
1022
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001023 def get_fetch_url(self, digest):
1024 return None
1025
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001026 def fetch(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001027 assert isinstance(digest, basestring)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001028 return file_read(os.path.join(self.base_path, digest))
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001029
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001030 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001031 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001032 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001033
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001034 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001035 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001036 item for item in items
1037 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001038 ]
1039
1040
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001041class LocalCache(object):
1042 """Local cache that stores objects fetched via Storage.
1043
1044 It can be accessed concurrently from multiple threads, so it should protect
1045 its internal state with some lock.
1046 """
1047
1048 def __enter__(self):
1049 """Context manager interface."""
1050 return self
1051
1052 def __exit__(self, _exc_type, _exec_value, _traceback):
1053 """Context manager interface."""
1054 return False
1055
1056 def cached_set(self):
1057 """Returns a set of all cached digests (always a new object)."""
1058 raise NotImplementedError()
1059
1060 def touch(self, digest, size):
1061 """Ensures item is not corrupted and updates its LRU position.
1062
1063 Arguments:
1064 digest: hash digest of item to check.
1065 size: expected size of this item.
1066
1067 Returns:
1068 True if item is in cache and not corrupted.
1069 """
1070 raise NotImplementedError()
1071
1072 def evict(self, digest):
1073 """Removes item from cache if it's there."""
1074 raise NotImplementedError()
1075
1076 def read(self, digest):
1077 """Returns contents of the cached item as a single str."""
1078 raise NotImplementedError()
1079
1080 def write(self, digest, content):
1081 """Reads data from |content| generator and stores it in cache."""
1082 raise NotImplementedError()
1083
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001084 def hardlink(self, digest, dest, file_mode):
1085 """Ensures file at |dest| has same content as cached |digest|.
1086
1087 If file_mode is provided, it is used to set the executable bit if
1088 applicable.
1089 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001090 raise NotImplementedError()
1091
1092
1093class MemoryCache(LocalCache):
1094 """LocalCache implementation that stores everything in memory."""
1095
1096 def __init__(self):
1097 super(MemoryCache, self).__init__()
1098 # Let's not assume dict is thread safe.
1099 self._lock = threading.Lock()
1100 self._contents = {}
1101
1102 def cached_set(self):
1103 with self._lock:
1104 return set(self._contents)
1105
1106 def touch(self, digest, size):
1107 with self._lock:
1108 return digest in self._contents
1109
1110 def evict(self, digest):
1111 with self._lock:
1112 self._contents.pop(digest, None)
1113
1114 def read(self, digest):
1115 with self._lock:
1116 return self._contents[digest]
1117
1118 def write(self, digest, content):
1119 # Assemble whole stream before taking the lock.
1120 data = ''.join(content)
1121 with self._lock:
1122 self._contents[digest] = data
1123
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001124 def hardlink(self, digest, dest, file_mode):
1125 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001126 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001127 if file_mode is not None:
1128 # Ignores all other bits.
1129 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001130
1131
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001132def get_hash_algo(_namespace):
1133 """Return hash algorithm class to use when uploading to given |namespace|."""
1134 # TODO(vadimsh): Implement this at some point.
1135 return hashlib.sha1
1136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001137
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001138def is_namespace_with_compression(namespace):
1139 """Returns True if given |namespace| stores compressed objects."""
1140 return namespace.endswith(('-gzip', '-deflate'))
1141
1142
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001143def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001144 """Returns an object that implements StorageApi interface."""
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001145 if re.match(r'^https?://.+$', file_or_url):
1146 return IsolateServer(file_or_url, namespace)
1147 else:
1148 return FileSystem(file_or_url)
1149
1150
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001151def get_storage(file_or_url, namespace):
1152 """Returns Storage class configured with appropriate StorageApi instance."""
1153 return Storage(
1154 get_storage_api(file_or_url, namespace),
1155 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001156
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001157
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001158def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001159 """Uploads the given tree to the given url.
1160
1161 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001162 base_url: The base url, it is assume that |base_url|/has/ can be used to
1163 query if an element was already uploaded, and |base_url|/store/
1164 can be used to upload a new element.
1165 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001166 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001167 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001168 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001169 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001170 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001171 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001172
1173
maruel@chromium.org41601642013-09-18 19:40:46 +00001174def load_isolated(content, os_flavor, algo):
1175 """Verifies the .isolated file is valid and loads this object with the json
1176 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001177
1178 Arguments:
1179 - content: raw serialized content to load.
1180 - os_flavor: OS to load this file on. Optional.
1181 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1182 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001183 """
1184 try:
1185 data = json.loads(content)
1186 except ValueError:
1187 raise ConfigError('Failed to parse: %s...' % content[:100])
1188
1189 if not isinstance(data, dict):
1190 raise ConfigError('Expected dict, got %r' % data)
1191
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001192 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001193 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001194 value = data.get('version', '1.0')
1195 if not isinstance(value, basestring):
1196 raise ConfigError('Expected string, got %r' % value)
1197 if not re.match(r'^(\d+)\.(\d+)$', value):
1198 raise ConfigError('Expected a compatible version, got %r' % value)
1199 if value.split('.', 1)[0] != '1':
1200 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1201
1202 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001203 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001204 # Default the algorithm used in the .isolated file itself, falls back to
1205 # 'sha-1' if unspecified.
1206 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1207
maruel@chromium.org41601642013-09-18 19:40:46 +00001208 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001209 if key == 'algo':
1210 if not isinstance(value, basestring):
1211 raise ConfigError('Expected string, got %r' % value)
1212 if value not in SUPPORTED_ALGOS:
1213 raise ConfigError(
1214 'Expected one of \'%s\', got %r' %
1215 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1216 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1217 raise ConfigError(
1218 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1219
1220 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001221 if not isinstance(value, list):
1222 raise ConfigError('Expected list, got %r' % value)
1223 if not value:
1224 raise ConfigError('Expected non-empty command')
1225 for subvalue in value:
1226 if not isinstance(subvalue, basestring):
1227 raise ConfigError('Expected string, got %r' % subvalue)
1228
1229 elif key == 'files':
1230 if not isinstance(value, dict):
1231 raise ConfigError('Expected dict, got %r' % value)
1232 for subkey, subvalue in value.iteritems():
1233 if not isinstance(subkey, basestring):
1234 raise ConfigError('Expected string, got %r' % subkey)
1235 if not isinstance(subvalue, dict):
1236 raise ConfigError('Expected dict, got %r' % subvalue)
1237 for subsubkey, subsubvalue in subvalue.iteritems():
1238 if subsubkey == 'l':
1239 if not isinstance(subsubvalue, basestring):
1240 raise ConfigError('Expected string, got %r' % subsubvalue)
1241 elif subsubkey == 'm':
1242 if not isinstance(subsubvalue, int):
1243 raise ConfigError('Expected int, got %r' % subsubvalue)
1244 elif subsubkey == 'h':
1245 if not is_valid_hash(subsubvalue, algo):
1246 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1247 elif subsubkey == 's':
1248 if not isinstance(subsubvalue, int):
1249 raise ConfigError('Expected int, got %r' % subsubvalue)
1250 else:
1251 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001252 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001253 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001254 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1255 subvalue)
1256 if bool('h' in subvalue) != bool('s' in subvalue):
1257 raise ConfigError(
1258 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1259 subvalue)
1260 if bool('s' in subvalue) == bool('l' in subvalue):
1261 raise ConfigError(
1262 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1263 subvalue)
1264 if bool('l' in subvalue) and bool('m' in subvalue):
1265 raise ConfigError(
1266 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001267 subvalue)
1268
1269 elif key == 'includes':
1270 if not isinstance(value, list):
1271 raise ConfigError('Expected list, got %r' % value)
1272 if not value:
1273 raise ConfigError('Expected non-empty includes list')
1274 for subvalue in value:
1275 if not is_valid_hash(subvalue, algo):
1276 raise ConfigError('Expected sha-1, got %r' % subvalue)
1277
1278 elif key == 'read_only':
1279 if not isinstance(value, bool):
1280 raise ConfigError('Expected bool, got %r' % value)
1281
1282 elif key == 'relative_cwd':
1283 if not isinstance(value, basestring):
1284 raise ConfigError('Expected string, got %r' % value)
1285
1286 elif key == 'os':
1287 if os_flavor and value != os_flavor:
1288 raise ConfigError(
1289 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1290 (os_flavor, value))
1291
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001292 elif key == 'version':
1293 # Already checked above.
1294 pass
1295
maruel@chromium.org41601642013-09-18 19:40:46 +00001296 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001297 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001298
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001299 # Automatically fix os.path.sep if necessary. While .isolated files are always
1300 # in the the native path format, someone could want to download an .isolated
1301 # tree from another OS.
1302 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1303 if 'files' in data:
1304 data['files'] = dict(
1305 (k.replace(wrong_path_sep, os.path.sep), v)
1306 for k, v in data['files'].iteritems())
1307 for v in data['files'].itervalues():
1308 if 'l' in v:
1309 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1310 if 'relative_cwd' in data:
1311 data['relative_cwd'] = data['relative_cwd'].replace(
1312 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001313 return data
1314
1315
1316class IsolatedFile(object):
1317 """Represents a single parsed .isolated file."""
1318 def __init__(self, obj_hash, algo):
1319 """|obj_hash| is really the sha-1 of the file."""
1320 logging.debug('IsolatedFile(%s)' % obj_hash)
1321 self.obj_hash = obj_hash
1322 self.algo = algo
1323 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1324 # .isolate and all the .isolated files recursively included by it with
1325 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1326 # .isolated file in the hash table, is important, as the later ones are not
1327 # processed until the firsts are retrieved and read.
1328 self.can_fetch = False
1329
1330 # Raw data.
1331 self.data = {}
1332 # A IsolatedFile instance, one per object in self.includes.
1333 self.children = []
1334
1335 # Set once the .isolated file is loaded.
1336 self._is_parsed = False
1337 # Set once the files are fetched.
1338 self.files_fetched = False
1339
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001340 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001341 """Verifies the .isolated file is valid and loads this object with the json
1342 data.
1343 """
1344 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1345 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001346 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001347 self.children = [
1348 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1349 ]
1350 self._is_parsed = True
1351
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001352 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001353 """Adds files in this .isolated file not present in |files| dictionary.
1354
1355 Preemptively request files.
1356
1357 Note that |files| is modified by this function.
1358 """
1359 assert self.can_fetch
1360 if not self._is_parsed or self.files_fetched:
1361 return
1362 logging.debug('fetch_files(%s)' % self.obj_hash)
1363 for filepath, properties in self.data.get('files', {}).iteritems():
1364 # Root isolated has priority on the files being mapped. In particular,
1365 # overriden files must not be fetched.
1366 if filepath not in files:
1367 files[filepath] = properties
1368 if 'h' in properties:
1369 # Preemptively request files.
1370 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001371 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001372 self.files_fetched = True
1373
1374
1375class Settings(object):
1376 """Results of a completely parsed .isolated file."""
1377 def __init__(self):
1378 self.command = []
1379 self.files = {}
1380 self.read_only = None
1381 self.relative_cwd = None
1382 # The main .isolated file, a IsolatedFile instance.
1383 self.root = None
1384
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001385 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001386 """Loads the .isolated and all the included .isolated asynchronously.
1387
1388 It enables support for "included" .isolated files. They are processed in
1389 strict order but fetched asynchronously from the cache. This is important so
1390 that a file in an included .isolated file that is overridden by an embedding
1391 .isolated file is not fetched needlessly. The includes are fetched in one
1392 pass and the files are fetched as soon as all the ones on the left-side
1393 of the tree were fetched.
1394
1395 The prioritization is very important here for nested .isolated files.
1396 'includes' have the highest priority and the algorithm is optimized for both
1397 deep and wide trees. A deep one is a long link of .isolated files referenced
1398 one at a time by one item in 'includes'. A wide one has a large number of
1399 'includes' in a single .isolated file. 'left' is defined as an included
1400 .isolated file earlier in the 'includes' list. So the order of the elements
1401 in 'includes' is important.
1402 """
1403 self.root = IsolatedFile(root_isolated_hash, algo)
1404
1405 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1406 pending = {}
1407 # Set of hashes of already retrieved items to refuse recursive includes.
1408 seen = set()
1409
1410 def retrieve(isolated_file):
1411 h = isolated_file.obj_hash
1412 if h in seen:
1413 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1414 assert h not in pending
1415 seen.add(h)
1416 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001417 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001418
1419 retrieve(self.root)
1420
1421 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001422 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001423 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001424 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001425 if item_hash == root_isolated_hash:
1426 # It's the root item.
1427 item.can_fetch = True
1428
1429 for new_child in item.children:
1430 retrieve(new_child)
1431
1432 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001433 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001434
1435 def check(n):
1436 return all(check(x) for x in n.children) and n.files_fetched
1437 assert check(self.root)
1438
1439 self.relative_cwd = self.relative_cwd or ''
1440 self.read_only = self.read_only or False
1441
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001442 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001443 if node.can_fetch:
1444 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001445 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001446 will_break = False
1447 for i in node.children:
1448 if not i.can_fetch:
1449 if will_break:
1450 break
1451 # Automatically mark the first one as fetcheable.
1452 i.can_fetch = True
1453 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001454 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001455
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001456 def _update_self(self, fetch_queue, node):
1457 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001458 # Grabs properties.
1459 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001460 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001461 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001462 if self.command:
1463 self.command[0] = self.command[0].replace('/', os.path.sep)
1464 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001465 if self.read_only is None and node.data.get('read_only') is not None:
1466 self.read_only = node.data['read_only']
1467 if (self.relative_cwd is None and
1468 node.data.get('relative_cwd') is not None):
1469 self.relative_cwd = node.data['relative_cwd']
1470
1471
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001472def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001473 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001474 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001475
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001476 Arguments:
1477 isolated_hash: hash of the root *.isolated file.
1478 storage: Storage class that communicates with isolate storage.
1479 cache: LocalCache class that knows how to store and map files locally.
1480 algo: hash algorithm to use.
1481 outdir: Output directory to map file tree to.
1482 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1483 require_command: Ensure *.isolated specifies a command to run.
1484
1485 Returns:
1486 Settings object that holds details about loaded *.isolated file.
1487 """
1488 with cache:
1489 fetch_queue = FetchQueue(storage, cache)
1490 settings = Settings()
1491
1492 with tools.Profiler('GetIsolateds'):
1493 # Optionally support local files by manually adding them to cache.
1494 if not is_valid_hash(isolated_hash, algo):
1495 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1496
1497 # Load all *.isolated and start loading rest of the files.
1498 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001499 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001500 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1501 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001502 raise ConfigError('No command to run')
1503
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001504 with tools.Profiler('GetRest'):
1505 # Create file system hierarchy.
1506 if not os.path.isdir(outdir):
1507 os.makedirs(outdir)
1508 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001509 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001510
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001511 # Ensure working directory exists.
1512 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1513 if not os.path.isdir(cwd):
1514 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001515
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001516 # Multimap: digest -> list of pairs (path, props).
1517 remaining = {}
1518 for filepath, props in settings.files.iteritems():
1519 if 'h' in props:
1520 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001521
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001522 # Now block on the remaining files to be downloaded and mapped.
1523 logging.info('Retrieving remaining files (%d of them)...',
1524 fetch_queue.pending_count)
1525 last_update = time.time()
1526 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1527 while remaining:
1528 detector.ping()
1529
1530 # Wait for any item to finish fetching to cache.
1531 digest = fetch_queue.wait(remaining)
1532
1533 # Link corresponding files to a fetched item in cache.
1534 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001535 cache.hardlink(
1536 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001537
1538 # Report progress.
1539 duration = time.time() - last_update
1540 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1541 msg = '%d files remaining...' % len(remaining)
1542 print msg
1543 logging.info(msg)
1544 last_update = time.time()
1545
1546 # Cache could evict some items we just tried to fetch, it's a fatal error.
1547 if not fetch_queue.verify_all_cached():
1548 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001549 return settings
1550
1551
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001552@subcommand.usage('<file1..fileN> or - to read from stdin')
1553def CMDarchive(parser, args):
1554 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001555 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001556
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001557 if files == ['-']:
1558 files = sys.stdin.readlines()
1559
1560 if not files:
1561 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001562
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001563 # Load the necessary metadata.
1564 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001565 infiles = dict(
1566 (
1567 f,
1568 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001569 's': os.stat(f).st_size,
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001570 'h': hash_file(f, get_hash_algo(options.namespace)),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001571 }
1572 )
1573 for f in files)
1574
vadimsh@chromium.orga4326472013-08-24 02:05:41 +00001575 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001576 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001577 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001578 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001579 infiles=infiles,
1580 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001581 if not ret:
1582 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1583 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001584
1585
1586def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001587 """Download data from the server.
1588
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001589 It can either download individual files or a complete tree from a .isolated
1590 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001591 """
1592 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001593 '-i', '--isolated', metavar='HASH',
1594 help='hash of an isolated file, .isolated file content is discarded, use '
1595 '--file if you need it')
1596 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001597 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1598 help='hash and destination of a file, can be used multiple times')
1599 parser.add_option(
1600 '-t', '--target', metavar='DIR', default=os.getcwd(),
1601 help='destination directory')
1602 options, args = parser.parse_args(args)
1603 if args:
1604 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001605 if bool(options.isolated) == bool(options.file):
1606 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001607
1608 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001609 storage = get_storage(options.isolate_server, options.namespace)
1610 cache = MemoryCache()
1611 algo = get_hash_algo(options.namespace)
1612
1613 # Fetching individual files.
1614 if options.file:
1615 channel = threading_utils.TaskChannel()
1616 pending = {}
1617 for digest, dest in options.file:
1618 pending[digest] = dest
1619 storage.async_fetch(
1620 channel,
1621 WorkerPool.MED,
1622 digest,
1623 UNKNOWN_FILE_SIZE,
1624 functools.partial(file_write, os.path.join(options.target, dest)))
1625 while pending:
1626 fetched = channel.pull()
1627 dest = pending.pop(fetched)
1628 logging.info('%s: %s', fetched, dest)
1629
1630 # Fetching whole isolated tree.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001631 if options.isolated:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001632 settings = fetch_isolated(
1633 isolated_hash=options.isolated,
1634 storage=storage,
1635 cache=cache,
1636 algo=algo,
1637 outdir=options.target,
1638 os_flavor=None,
1639 require_command=False)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001640 rel = os.path.join(options.target, settings.relative_cwd)
1641 print('To run this test please run from the directory %s:' %
1642 os.path.join(options.target, rel))
1643 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001644
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001645 return 0
1646
1647
1648class OptionParserIsolateServer(tools.OptionParserWithLogging):
1649 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001650 tools.OptionParserWithLogging.__init__(
1651 self,
1652 version=__version__,
1653 prog=os.path.basename(sys.modules[__name__].__file__),
1654 **kwargs)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001655 self.add_option(
1656 '-I', '--isolate-server',
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00001657 metavar='URL', default='',
1658 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001659 self.add_option(
1660 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001661 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001662
1663 def parse_args(self, *args, **kwargs):
1664 options, args = tools.OptionParserWithLogging.parse_args(
1665 self, *args, **kwargs)
1666 options.isolate_server = options.isolate_server.rstrip('/')
1667 if not options.isolate_server:
1668 self.error('--isolate-server is required.')
1669 return options, args
1670
1671
1672def main(args):
1673 dispatcher = subcommand.CommandDispatcher(__name__)
1674 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001675 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00001676 except Exception as e:
1677 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001678 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001679
1680
1681if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001682 fix_encoding.fix_encoding()
1683 tools.disable_buffering()
1684 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001685 sys.exit(main(sys.argv[1:]))