blob: 5f9fd14a98cda61b9d3a78ec0298bf4d49b36e48 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000012import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000020import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000022from third_party import colorama
23from third_party.depot_tools import fix_encoding
24from third_party.depot_tools import subcommand
25
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050026from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000027from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000028from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000029from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000030
31
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000032# Version of isolate protocol passed to the server in /handshake request.
33ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000035
36# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000037# All files are sorted by likelihood of a change in the file content
38# (currently file size is used to estimate this: larger the file -> larger the
39# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000041# and so on. Numbers here is a trade-off; the more per request, the lower the
42# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
43# larger values cause longer lookups, increasing the initial latency to start
44# uploading, which is especially an issue for large files. This value is
45# optimized for the "few thousands files to look up with minimal number of large
46# files missing" case.
47ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000048
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000049
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000050# A list of already compressed extension types that should not receive any
51# compression before being uploaded.
52ALREADY_COMPRESSED_TYPES = [
53 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
54 'wav', 'zip'
55]
56
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000057
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000058# The file size to be used when we don't know the correct file size,
59# generally used for .isolated files.
60UNKNOWN_FILE_SIZE = None
61
62
63# The size of each chunk to read when downloading and unzipping files.
64ZIPPED_FILE_CHUNK = 16 * 1024
65
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000066# Chunk size to use when doing disk I/O.
67DISK_FILE_CHUNK = 1024 * 1024
68
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000069# Chunk size to use when reading from network stream.
70NET_IO_FILE_CHUNK = 16 * 1024
71
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000072
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000073# Read timeout in seconds for downloads from isolate storage. If there's no
74# response from the server within this timeout whole download will be aborted.
75DOWNLOAD_READ_TIMEOUT = 60
76
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000077# Maximum expected delay (in seconds) between successive file fetches
78# in run_tha_test. If it takes longer than that, a deadlock might be happening
79# and all stack frames for all threads are dumped to log.
80DEADLOCK_TIMEOUT = 5 * 60
81
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000082
maruel@chromium.org41601642013-09-18 19:40:46 +000083# The delay (in seconds) to wait between logging statements when retrieving
84# the required files. This is intended to let the user (or buildbot) know that
85# the program is still running.
86DELAY_BETWEEN_UPDATES_IN_SECS = 30
87
88
maruel@chromium.org385d73d2013-09-19 18:33:21 +000089# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
90# specify the names here.
91SUPPORTED_ALGOS = {
92 'md5': hashlib.md5,
93 'sha-1': hashlib.sha1,
94 'sha-512': hashlib.sha512,
95}
96
97
98# Used for serialization.
99SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
100
101
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500102DEFAULT_BLACKLIST = (
103 # Temporary vim or python files.
104 r'^.+\.(?:pyc|swp)$',
105 # .git or .svn directory.
106 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
107)
108
109
110# Chromium-specific.
111DEFAULT_BLACKLIST += (
112 r'^.+\.(?:run_test_cases)$',
113 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
114)
115
116
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000117class ConfigError(ValueError):
118 """Generic failure to load a .isolated file."""
119 pass
120
121
122class MappingError(OSError):
123 """Failed to recreate the tree."""
124 pass
125
126
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000127def is_valid_hash(value, algo):
128 """Returns if the value is a valid hash for the corresponding algorithm."""
129 size = 2 * algo().digest_size
130 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
131
132
133def hash_file(filepath, algo):
134 """Calculates the hash of a file without reading it all in memory at once.
135
136 |algo| should be one of hashlib hashing algorithm.
137 """
138 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000139 with open(filepath, 'rb') as f:
140 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000141 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000142 if not chunk:
143 break
144 digest.update(chunk)
145 return digest.hexdigest()
146
147
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000148def stream_read(stream, chunk_size):
149 """Reads chunks from |stream| and yields them."""
150 while True:
151 data = stream.read(chunk_size)
152 if not data:
153 break
154 yield data
155
156
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000157def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
158 """Yields file content in chunks of given |chunk_size|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000159 with open(filepath, 'rb') as f:
160 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000161 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000162 if not data:
163 break
164 yield data
165
166
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000167def file_write(filepath, content_generator):
168 """Writes file content as generated by content_generator.
169
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000170 Creates the intermediary directory as needed.
171
172 Returns the number of bytes written.
173
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000174 Meant to be mocked out in unit tests.
175 """
176 filedir = os.path.dirname(filepath)
177 if not os.path.isdir(filedir):
178 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000179 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000180 with open(filepath, 'wb') as f:
181 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000182 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000183 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000184 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000185
186
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000187def zip_compress(content_generator, level=7):
188 """Reads chunks from |content_generator| and yields zip compressed chunks."""
189 compressor = zlib.compressobj(level)
190 for chunk in content_generator:
191 compressed = compressor.compress(chunk)
192 if compressed:
193 yield compressed
194 tail = compressor.flush(zlib.Z_FINISH)
195 if tail:
196 yield tail
197
198
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000199def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
200 """Reads zipped data from |content_generator| and yields decompressed data.
201
202 Decompresses data in small chunks (no larger than |chunk_size|) so that
203 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
204
205 Raises IOError if data is corrupted or incomplete.
206 """
207 decompressor = zlib.decompressobj()
208 compressed_size = 0
209 try:
210 for chunk in content_generator:
211 compressed_size += len(chunk)
212 data = decompressor.decompress(chunk, chunk_size)
213 if data:
214 yield data
215 while decompressor.unconsumed_tail:
216 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
217 if data:
218 yield data
219 tail = decompressor.flush()
220 if tail:
221 yield tail
222 except zlib.error as e:
223 raise IOError(
224 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
225 # Ensure all data was read and decompressed.
226 if decompressor.unused_data or decompressor.unconsumed_tail:
227 raise IOError('Not all data was decompressed')
228
229
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000230def get_zip_compression_level(filename):
231 """Given a filename calculates the ideal zip compression level to use."""
232 file_ext = os.path.splitext(filename)[1].lower()
233 # TODO(csharp): Profile to find what compression level works best.
234 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
235
236
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000237def create_directories(base_directory, files):
238 """Creates the directory structure needed by the given list of files."""
239 logging.debug('create_directories(%s, %d)', base_directory, len(files))
240 # Creates the tree of directories to create.
241 directories = set(os.path.dirname(f) for f in files)
242 for item in list(directories):
243 while item:
244 directories.add(item)
245 item = os.path.dirname(item)
246 for d in sorted(directories):
247 if d:
248 os.mkdir(os.path.join(base_directory, d))
249
250
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500251def create_symlinks(base_directory, files):
252 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000253 for filepath, properties in files:
254 if 'l' not in properties:
255 continue
256 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500257 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000258 logging.warning('Ignoring symlink %s', filepath)
259 continue
260 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500261 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000262 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000263
264
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000265def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000266 """Determines if the given files appears valid.
267
268 Currently it just checks the file's size.
269 """
270 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000271 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000272 actual_size = os.stat(filepath).st_size
273 if size != actual_size:
274 logging.warning(
275 'Found invalid item %s; %d != %d',
276 os.path.basename(filepath), actual_size, size)
277 return False
278 return True
279
280
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000281class WorkerPool(threading_utils.AutoRetryThreadPool):
282 """Thread pool that automatically retries on IOError and runs a preconfigured
283 function.
284 """
285 # Initial and maximum number of worker threads.
286 INITIAL_WORKERS = 2
287 MAX_WORKERS = 16
288 RETRIES = 5
289
290 def __init__(self):
291 super(WorkerPool, self).__init__(
292 [IOError],
293 self.RETRIES,
294 self.INITIAL_WORKERS,
295 self.MAX_WORKERS,
296 0,
297 'remote')
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000298
299
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000300class Item(object):
301 """An item to push to Storage.
302
303 It starts its life in a main thread, travels to 'contains' thread, then to
304 'push' thread and then finally back to the main thread.
305
306 It is never used concurrently from multiple threads.
307 """
308
309 def __init__(self, digest, size, is_isolated=False):
310 self.digest = digest
311 self.size = size
312 self.is_isolated = is_isolated
313 self.compression_level = 6
314 self.push_state = None
315
316 def content(self, chunk_size):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000317 """Iterable with content of this item in chunks of given size.
318
319 Arguments:
320 chunk_size: preferred size of the chunk to produce, may be ignored.
321 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000322 raise NotImplementedError()
323
324
325class FileItem(Item):
326 """A file to push to Storage."""
327
328 def __init__(self, path, digest, size, is_isolated):
329 super(FileItem, self).__init__(digest, size, is_isolated)
330 self.path = path
331 self.compression_level = get_zip_compression_level(path)
332
333 def content(self, chunk_size):
334 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000335
336
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000337class BufferItem(Item):
338 """A byte buffer to push to Storage."""
339
340 def __init__(self, buf, algo, is_isolated=False):
341 super(BufferItem, self).__init__(
342 algo(buf).hexdigest(), len(buf), is_isolated)
343 self.buffer = buf
344
345 def content(self, _chunk_size):
346 return [self.buffer]
347
348
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000349class Storage(object):
350 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000351
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000352 def __init__(self, storage_api, use_zip):
353 self.use_zip = use_zip
354 self._storage_api = storage_api
355 self._cpu_thread_pool = None
356 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000357
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000358 @property
359 def cpu_thread_pool(self):
360 """ThreadPool for CPU-bound tasks like zipping."""
361 if self._cpu_thread_pool is None:
362 self._cpu_thread_pool = threading_utils.ThreadPool(
363 2, max(threading_utils.num_processors(), 2), 0, 'zip')
364 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000365
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000366 @property
367 def net_thread_pool(self):
368 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
369 if self._net_thread_pool is None:
370 self._net_thread_pool = WorkerPool()
371 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000372
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000373 def close(self):
374 """Waits for all pending tasks to finish."""
375 if self._cpu_thread_pool:
376 self._cpu_thread_pool.join()
377 self._cpu_thread_pool.close()
378 self._cpu_thread_pool = None
379 if self._net_thread_pool:
380 self._net_thread_pool.join()
381 self._net_thread_pool.close()
382 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000383
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 def __enter__(self):
385 """Context manager interface."""
386 return self
387
388 def __exit__(self, _exc_type, _exc_value, _traceback):
389 """Context manager interface."""
390 self.close()
391 return False
392
393 def upload_tree(self, indir, infiles):
394 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000395
396 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397 indir: root directory the infiles are based in.
398 infiles: dict of files to upload from |indir|.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000399
400 Returns:
401 List of items that were uploaded. All other items are already there.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000402 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000403 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
404
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000405 # Convert |indir| + |infiles| into a list of FileItem objects.
406 # Filter out symlinks, since they are not represented by items on isolate
407 # server side.
408 items = [
409 FileItem(
410 path=os.path.join(indir, filepath),
411 digest=metadata['h'],
412 size=metadata['s'],
413 is_isolated=metadata.get('priority') == '0')
414 for filepath, metadata in infiles.iteritems()
415 if 'l' not in metadata
416 ]
417
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000418 return self.upload_items(items)
419
420 def upload_items(self, items):
421 """Uploads bunch of items to the isolate server.
422
423 Will upload only items that are missing.
424
425 Arguments:
426 items: list of Item instances that represents data to upload.
427
428 Returns:
429 List of items that were uploaded. All other items are already there.
430 """
431 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
432 # used by swarming.py. There's no need to spawn multiple threads and try to
433 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
434 # 'push' should be performed sequentially in the context of current thread.
435
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000436 # For each digest keep only first Item that matches it. All other items
437 # are just indistinguishable copies from the point of view of isolate
438 # server (it doesn't care about paths at all, only content and digests).
439 seen = {}
440 duplicates = 0
441 for item in items:
442 if seen.setdefault(item.digest, item) is not item:
443 duplicates += 1
444 items = seen.values()
445 if duplicates:
446 logging.info('Skipped %d duplicated files', duplicates)
447
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000448 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000449 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000451 for missing_item in self.get_missing_items(items):
452 missing.add(missing_item)
453 self.async_push(
454 channel,
455 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
456 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000458 uploaded = []
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 # No need to spawn deadlock detector thread if there's nothing to upload.
460 if missing:
461 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
462 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000463 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 detector.ping()
465 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000466 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000467 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000469 logging.info('All files are uploaded')
470
471 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000472 total = len(items)
473 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 logging.info(
475 'Total: %6d, %9.1fkb',
476 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000477 total_size / 1024.)
478 cache_hit = set(items) - missing
479 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000480 logging.info(
481 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
482 len(cache_hit),
483 cache_hit_size / 1024.,
484 len(cache_hit) * 100. / total,
485 cache_hit_size * 100. / total_size if total_size else 0)
486 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000487 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000488 logging.info(
489 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
490 len(cache_miss),
491 cache_miss_size / 1024.,
492 len(cache_miss) * 100. / total,
493 cache_miss_size * 100. / total_size if total_size else 0)
494
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000495 return uploaded
496
497 def get_fetch_url(self, digest):
498 """Returns an URL that can be used to fetch an item with given digest.
499
500 Arguments:
501 digest: hex digest of item to fetch.
502
503 Returns:
504 An URL or None if underlying protocol doesn't support this.
505 """
506 return self._storage_api.get_fetch_url(digest)
507
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000508 def async_push(self, channel, priority, item):
509 """Starts asynchronous push to the server in a parallel thread.
510
511 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000512 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000513 priority: thread pool task priority for the push.
514 item: item to upload as instance of Item class.
515 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000516 def push(content):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000517 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000518 self._storage_api.push(item, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000519 return item
520
521 # If zipping is not required, just start a push task.
522 if not self.use_zip:
523 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000524 item.content(DISK_FILE_CHUNK))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000525 return
526
527 # If zipping is enabled, zip in a separate thread.
528 def zip_and_push():
529 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
530 # content right here. It will block until all file is zipped.
531 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000532 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
533 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 data = ''.join(stream)
535 except Exception as exc:
536 logging.error('Failed to zip \'%s\': %s', item, exc)
537 channel.send_exception(exc)
538 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000539 self.net_thread_pool.add_task_with_channel(
540 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000541 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000542
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000543 def async_fetch(self, channel, priority, digest, size, sink):
544 """Starts asynchronous fetch from the server in a parallel thread.
545
546 Arguments:
547 channel: TaskChannel that receives back |digest| when download ends.
548 priority: thread pool task priority for the fetch.
549 digest: hex digest of an item to download.
550 size: expected size of the item (after decompression).
551 sink: function that will be called as sink(generator).
552 """
553 def fetch():
554 try:
555 # Prepare reading pipeline.
556 stream = self._storage_api.fetch(digest)
557 if self.use_zip:
558 stream = zip_decompress(stream, DISK_FILE_CHUNK)
559 # Run |stream| through verifier that will assert its size.
560 verifier = FetchStreamVerifier(stream, size)
561 # Verified stream goes to |sink|.
562 sink(verifier.run())
563 except Exception as err:
564 logging.warning('Failed to fetch %s: %s', digest, err)
565 raise
566 return digest
567
568 # Don't bother with zip_thread_pool for decompression. Decompression is
569 # really fast and most probably IO bound anyway.
570 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
571
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000572 def get_missing_items(self, items):
573 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000574
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000575 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000576
577 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000578 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000579
580 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000581 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000582 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000583 channel = threading_utils.TaskChannel()
584 pending = 0
585 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000586 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000587 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
588 self._storage_api.contains, batch)
589 pending += 1
590 # Yield results as they come in.
591 for _ in xrange(pending):
592 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000593 yield missing
594
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000595 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000596 def batch_items_for_check(items):
597 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000598
599 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000600 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000601
602 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000603 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000604
605 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000606 Batches of items to query for existence in a single operation,
607 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000608 """
609 batch_count = 0
610 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
611 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000612 for item in sorted(items, key=lambda x: x.size, reverse=True):
613 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000614 if len(next_queries) == batch_size_limit:
615 yield next_queries
616 next_queries = []
617 batch_count += 1
618 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
619 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
620 if next_queries:
621 yield next_queries
622
623
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000624class FetchQueue(object):
625 """Fetches items from Storage and places them into LocalCache.
626
627 It manages multiple concurrent fetch operations. Acts as a bridge between
628 Storage and LocalCache so that Storage and LocalCache don't depend on each
629 other at all.
630 """
631
632 def __init__(self, storage, cache):
633 self.storage = storage
634 self.cache = cache
635 self._channel = threading_utils.TaskChannel()
636 self._pending = set()
637 self._accessed = set()
638 self._fetched = cache.cached_set()
639
640 def add(self, priority, digest, size=UNKNOWN_FILE_SIZE):
641 """Starts asynchronous fetch of item |digest|."""
642 # Fetching it now?
643 if digest in self._pending:
644 return
645
646 # Mark this file as in use, verify_all_cached will later ensure it is still
647 # in cache.
648 self._accessed.add(digest)
649
650 # Already fetched? Notify cache to update item's LRU position.
651 if digest in self._fetched:
652 # 'touch' returns True if item is in cache and not corrupted.
653 if self.cache.touch(digest, size):
654 return
655 # Item is corrupted, remove it from cache and fetch it again.
656 self._fetched.remove(digest)
657 self.cache.evict(digest)
658
659 # TODO(maruel): It should look at the free disk space, the current cache
660 # size and the size of the new item on every new item:
661 # - Trim the cache as more entries are listed when free disk space is low,
662 # otherwise if the amount of data downloaded during the run > free disk
663 # space, it'll crash.
664 # - Make sure there's enough free disk space to fit all dependencies of
665 # this run! If not, abort early.
666
667 # Start fetching.
668 self._pending.add(digest)
669 self.storage.async_fetch(
670 self._channel, priority, digest, size,
671 functools.partial(self.cache.write, digest))
672
673 def wait(self, digests):
674 """Starts a loop that waits for at least one of |digests| to be retrieved.
675
676 Returns the first digest retrieved.
677 """
678 # Flush any already fetched items.
679 for digest in digests:
680 if digest in self._fetched:
681 return digest
682
683 # Ensure all requested items are being fetched now.
684 assert all(digest in self._pending for digest in digests), (
685 digests, self._pending)
686
687 # Wait for some requested item to finish fetching.
688 while self._pending:
689 digest = self._channel.pull()
690 self._pending.remove(digest)
691 self._fetched.add(digest)
692 if digest in digests:
693 return digest
694
695 # Should never reach this point due to assert above.
696 raise RuntimeError('Impossible state')
697
698 def inject_local_file(self, path, algo):
699 """Adds local file to the cache as if it was fetched from storage."""
700 with open(path, 'rb') as f:
701 data = f.read()
702 digest = algo(data).hexdigest()
703 self.cache.write(digest, [data])
704 self._fetched.add(digest)
705 return digest
706
707 @property
708 def pending_count(self):
709 """Returns number of items to be fetched."""
710 return len(self._pending)
711
712 def verify_all_cached(self):
713 """True if all accessed items are in cache."""
714 return self._accessed.issubset(self.cache.cached_set())
715
716
717class FetchStreamVerifier(object):
718 """Verifies that fetched file is valid before passing it to the LocalCache."""
719
720 def __init__(self, stream, expected_size):
721 self.stream = stream
722 self.expected_size = expected_size
723 self.current_size = 0
724
725 def run(self):
726 """Generator that yields same items as |stream|.
727
728 Verifies |stream| is complete before yielding a last chunk to consumer.
729
730 Also wraps IOError produced by consumer into MappingError exceptions since
731 otherwise Storage will retry fetch on unrelated local cache errors.
732 """
733 # Read one chunk ahead, keep it in |stored|.
734 # That way a complete stream can be verified before pushing last chunk
735 # to consumer.
736 stored = None
737 for chunk in self.stream:
738 assert chunk is not None
739 if stored is not None:
740 self._inspect_chunk(stored, is_last=False)
741 try:
742 yield stored
743 except IOError as exc:
744 raise MappingError('Failed to store an item in cache: %s' % exc)
745 stored = chunk
746 if stored is not None:
747 self._inspect_chunk(stored, is_last=True)
748 try:
749 yield stored
750 except IOError as exc:
751 raise MappingError('Failed to store an item in cache: %s' % exc)
752
753 def _inspect_chunk(self, chunk, is_last):
754 """Called for each fetched chunk before passing it to consumer."""
755 self.current_size += len(chunk)
756 if (is_last and (self.expected_size != UNKNOWN_FILE_SIZE) and
757 (self.expected_size != self.current_size)):
758 raise IOError('Incorrect file size: expected %d, got %d' % (
759 self.expected_size, self.current_size))
760
761
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000762class StorageApi(object):
763 """Interface for classes that implement low-level storage operations."""
764
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000765 def get_fetch_url(self, digest):
766 """Returns an URL that can be used to fetch an item with given digest.
767
768 Arguments:
769 digest: hex digest of item to fetch.
770
771 Returns:
772 An URL or None if the protocol doesn't support this.
773 """
774 raise NotImplementedError()
775
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000776 def fetch(self, digest):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000777 """Fetches an object and yields its content.
778
779 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000780 digest: hash digest of item to download.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000781
782 Yields:
783 Chunks of downloaded item (as str objects).
784 """
785 raise NotImplementedError()
786
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000787 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000788 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000789
790 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000791 item: Item object that holds information about an item being pushed.
792 content: a generator that yields chunks to push.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000793
794 Returns:
795 None.
796 """
797 raise NotImplementedError()
798
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000799 def contains(self, items):
800 """Checks for existence of given |items| on the server.
801
802 Mutates |items| by assigning opaque implement specific object to Item's
803 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000804
805 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000806 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000807
808 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000809 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000810 """
811 raise NotImplementedError()
812
813
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000814class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 """StorageApi implementation that downloads and uploads to Isolate Server.
816
817 It uploads and downloads directly from Google Storage whenever appropriate.
818 """
819
820 class _PushState(object):
821 """State needed to call .push(), to be stored in Item.push_state."""
822 def __init__(self, upload_url, finalize_url):
823 self.upload_url = upload_url
824 self.finalize_url = finalize_url
825 self.uploaded = False
826 self.finalized = False
827
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000828 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000829 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000830 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000831 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000832 self.namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000833 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000834 self._server_caps = None
835
836 @staticmethod
837 def _generate_handshake_request():
838 """Returns a dict to be sent as handshake request body."""
839 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
840 return {
841 'client_app_version': __version__,
842 'fetcher': True,
843 'protocol_version': ISOLATE_PROTOCOL_VERSION,
844 'pusher': True,
845 }
846
847 @staticmethod
848 def _validate_handshake_response(caps):
849 """Validates and normalizes handshake response."""
850 logging.info('Protocol version: %s', caps['protocol_version'])
851 logging.info('Server version: %s', caps['server_app_version'])
852 if caps.get('error'):
853 raise MappingError(caps['error'])
854 if not caps['access_token']:
855 raise ValueError('access_token is missing')
856 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000857
858 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000859 def _server_capabilities(self):
860 """Performs handshake with the server if not yet done.
861
862 Returns:
863 Server capabilities dictionary as returned by /handshake endpoint.
864
865 Raises:
866 MappingError if server rejects the handshake.
867 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000868 # TODO(maruel): Make this request much earlier asynchronously while the
869 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000870 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000871 if self._server_caps is None:
872 request_body = json.dumps(
873 self._generate_handshake_request(), separators=(',', ':'))
874 response = net.url_read(
875 url=self.base_url + '/content-gs/handshake',
876 data=request_body,
877 content_type='application/json',
878 method='POST')
879 if response is None:
880 raise MappingError('Failed to perform handshake.')
881 try:
882 caps = json.loads(response)
883 if not isinstance(caps, dict):
884 raise ValueError('Expecting JSON dict')
885 self._server_caps = self._validate_handshake_response(caps)
886 except (ValueError, KeyError, TypeError) as exc:
887 # KeyError exception has very confusing str conversion: it's just a
888 # missing key value and nothing else. So print exception class name
889 # as well.
890 raise MappingError('Invalid handshake response (%s): %s' % (
891 exc.__class__.__name__, exc))
892 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000893
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000894 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000895 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000896 return '%s/content-gs/retrieve/%s/%s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000897 self.base_url, self.namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000898
899 def fetch(self, digest):
900 source_url = self.get_fetch_url(digest)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000901 logging.debug('download_file(%s)', source_url)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000902
903 # Because the app engine DB is only eventually consistent, retry 404 errors
904 # because the file might just not be visible yet (even though it has been
905 # uploaded).
906 connection = net.url_open(
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000907 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000908 if not connection:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000909 raise IOError('Unable to open connection to %s' % source_url)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000910 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000911
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000912 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000913 assert isinstance(item, Item)
914 assert isinstance(item.push_state, IsolateServer._PushState)
915 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000916
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000917 # TODO(vadimsh): Do not read from |content| generator when retrying push.
918 # If |content| is indeed a generator, it can not be re-winded back
919 # to the beginning of the stream. A retry will find it exhausted. A possible
920 # solution is to wrap |content| generator with some sort of caching
921 # restartable generator. It should be done alongside streaming support
922 # implementation.
923
924 # This push operation may be a retry after failed finalization call below,
925 # no need to reupload contents in that case.
926 if not item.push_state.uploaded:
927 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
928 # upload support is implemented.
929 if isinstance(content, list) and len(content) == 1:
930 content = content[0]
931 else:
932 content = ''.join(content)
933 # PUT file to |upload_url|.
934 response = net.url_read(
935 url=item.push_state.upload_url,
936 data=content,
937 content_type='application/octet-stream',
938 method='PUT')
939 if response is None:
940 raise IOError('Failed to upload a file %s to %s' % (
941 item.digest, item.push_state.upload_url))
942 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000943 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000944 logging.info(
945 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000946
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000947 # Optionally notify the server that it's done.
948 if item.push_state.finalize_url:
949 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
950 # send it to isolated server. That way isolate server can verify that
951 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
952 # stored files).
953 response = net.url_read(
954 url=item.push_state.finalize_url,
955 data='',
956 content_type='application/json',
957 method='POST')
958 if response is None:
959 raise IOError('Failed to finalize an upload of %s' % item.digest)
960 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000961
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000962 def contains(self, items):
963 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000964
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000965 # Request body is a json encoded list of dicts.
966 body = [
967 {
968 'h': item.digest,
969 's': item.size,
970 'i': int(item.is_isolated),
971 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000972 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000973
974 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
975 self.base_url,
976 self.namespace,
977 urllib.quote(self._server_capabilities['access_token']))
978 response_body = net.url_read(
979 url=query_url,
980 data=json.dumps(body, separators=(',', ':')),
981 content_type='application/json',
982 method='POST')
983 if response_body is None:
984 raise MappingError('Failed to execute /pre-upload query')
985
986 # Response body is a list of push_urls (or null if file is already present).
987 try:
988 response = json.loads(response_body)
989 if not isinstance(response, list):
990 raise ValueError('Expecting response with json-encoded list')
991 if len(response) != len(items):
992 raise ValueError(
993 'Incorrect number of items in the list, expected %d, '
994 'but got %d' % (len(items), len(response)))
995 except ValueError as err:
996 raise MappingError(
997 'Invalid response from server: %s, body is %s' % (err, response_body))
998
999 # Pick Items that are missing, attach _PushState to them.
1000 missing_items = []
1001 for i, push_urls in enumerate(response):
1002 if push_urls:
1003 assert len(push_urls) == 2, str(push_urls)
1004 item = items[i]
1005 assert item.push_state is None
1006 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
1007 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001008 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001009 len(items), len(items) - len(missing_items))
1010 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001011
1012
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001013class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001014 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001015
1016 The common use case is a NFS/CIFS file server that is mounted locally that is
1017 used to fetch the file on a local partition.
1018 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001019
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001020 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001021 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001022 self.base_path = base_path
1023
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001024 def get_fetch_url(self, digest):
1025 return None
1026
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001027 def fetch(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001028 assert isinstance(digest, basestring)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001029 return file_read(os.path.join(self.base_path, digest))
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001030
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001031 def push(self, item, content):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001032 assert isinstance(item, Item)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001033 file_write(os.path.join(self.base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001034
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001035 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001036 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001037 item for item in items
1038 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001039 ]
1040
1041
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001042class LocalCache(object):
1043 """Local cache that stores objects fetched via Storage.
1044
1045 It can be accessed concurrently from multiple threads, so it should protect
1046 its internal state with some lock.
1047 """
1048
1049 def __enter__(self):
1050 """Context manager interface."""
1051 return self
1052
1053 def __exit__(self, _exc_type, _exec_value, _traceback):
1054 """Context manager interface."""
1055 return False
1056
1057 def cached_set(self):
1058 """Returns a set of all cached digests (always a new object)."""
1059 raise NotImplementedError()
1060
1061 def touch(self, digest, size):
1062 """Ensures item is not corrupted and updates its LRU position.
1063
1064 Arguments:
1065 digest: hash digest of item to check.
1066 size: expected size of this item.
1067
1068 Returns:
1069 True if item is in cache and not corrupted.
1070 """
1071 raise NotImplementedError()
1072
1073 def evict(self, digest):
1074 """Removes item from cache if it's there."""
1075 raise NotImplementedError()
1076
1077 def read(self, digest):
1078 """Returns contents of the cached item as a single str."""
1079 raise NotImplementedError()
1080
1081 def write(self, digest, content):
1082 """Reads data from |content| generator and stores it in cache."""
1083 raise NotImplementedError()
1084
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001085 def hardlink(self, digest, dest, file_mode):
1086 """Ensures file at |dest| has same content as cached |digest|.
1087
1088 If file_mode is provided, it is used to set the executable bit if
1089 applicable.
1090 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001091 raise NotImplementedError()
1092
1093
1094class MemoryCache(LocalCache):
1095 """LocalCache implementation that stores everything in memory."""
1096
1097 def __init__(self):
1098 super(MemoryCache, self).__init__()
1099 # Let's not assume dict is thread safe.
1100 self._lock = threading.Lock()
1101 self._contents = {}
1102
1103 def cached_set(self):
1104 with self._lock:
1105 return set(self._contents)
1106
1107 def touch(self, digest, size):
1108 with self._lock:
1109 return digest in self._contents
1110
1111 def evict(self, digest):
1112 with self._lock:
1113 self._contents.pop(digest, None)
1114
1115 def read(self, digest):
1116 with self._lock:
1117 return self._contents[digest]
1118
1119 def write(self, digest, content):
1120 # Assemble whole stream before taking the lock.
1121 data = ''.join(content)
1122 with self._lock:
1123 self._contents[digest] = data
1124
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001125 def hardlink(self, digest, dest, file_mode):
1126 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001127 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001128 if file_mode is not None:
1129 # Ignores all other bits.
1130 os.chmod(dest, file_mode & 0500)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001131
1132
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001133def get_hash_algo(_namespace):
1134 """Return hash algorithm class to use when uploading to given |namespace|."""
1135 # TODO(vadimsh): Implement this at some point.
1136 return hashlib.sha1
1137
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001138
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001139def is_namespace_with_compression(namespace):
1140 """Returns True if given |namespace| stores compressed objects."""
1141 return namespace.endswith(('-gzip', '-deflate'))
1142
1143
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001144def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001145 """Returns an object that implements StorageApi interface."""
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001146 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001147 return IsolateServer(file_or_url, namespace)
1148 else:
1149 return FileSystem(file_or_url)
1150
1151
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001152def get_storage(file_or_url, namespace):
1153 """Returns Storage class configured with appropriate StorageApi instance."""
1154 return Storage(
1155 get_storage_api(file_or_url, namespace),
1156 is_namespace_with_compression(namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001157
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001158
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001159def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001160 """Uploads the given tree to the given url.
1161
1162 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001163 base_url: The base url, it is assume that |base_url|/has/ can be used to
1164 query if an element was already uploaded, and |base_url|/store/
1165 can be used to upload a new element.
1166 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001167 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001168 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001169 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001170 with get_storage(base_url, namespace) as storage:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001171 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001172 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001173
1174
maruel@chromium.org41601642013-09-18 19:40:46 +00001175def load_isolated(content, os_flavor, algo):
1176 """Verifies the .isolated file is valid and loads this object with the json
1177 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001178
1179 Arguments:
1180 - content: raw serialized content to load.
1181 - os_flavor: OS to load this file on. Optional.
1182 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
1183 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +00001184 """
1185 try:
1186 data = json.loads(content)
1187 except ValueError:
1188 raise ConfigError('Failed to parse: %s...' % content[:100])
1189
1190 if not isinstance(data, dict):
1191 raise ConfigError('Expected dict, got %r' % data)
1192
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001193 # Check 'version' first, since it could modify the parsing after.
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001194 # TODO(maruel): Drop support for unversioned .isolated file around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001195 value = data.get('version', '1.0')
1196 if not isinstance(value, basestring):
1197 raise ConfigError('Expected string, got %r' % value)
1198 if not re.match(r'^(\d+)\.(\d+)$', value):
1199 raise ConfigError('Expected a compatible version, got %r' % value)
1200 if value.split('.', 1)[0] != '1':
1201 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
1202
1203 if algo is None:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001204 # TODO(maruel): Remove the default around Jan 2014.
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001205 # Default the algorithm used in the .isolated file itself, falls back to
1206 # 'sha-1' if unspecified.
1207 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
1208
maruel@chromium.org41601642013-09-18 19:40:46 +00001209 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001210 if key == 'algo':
1211 if not isinstance(value, basestring):
1212 raise ConfigError('Expected string, got %r' % value)
1213 if value not in SUPPORTED_ALGOS:
1214 raise ConfigError(
1215 'Expected one of \'%s\', got %r' %
1216 (', '.join(sorted(SUPPORTED_ALGOS)), value))
1217 if value != SUPPORTED_ALGOS_REVERSE[algo]:
1218 raise ConfigError(
1219 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
1220
1221 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +00001222 if not isinstance(value, list):
1223 raise ConfigError('Expected list, got %r' % value)
1224 if not value:
1225 raise ConfigError('Expected non-empty command')
1226 for subvalue in value:
1227 if not isinstance(subvalue, basestring):
1228 raise ConfigError('Expected string, got %r' % subvalue)
1229
1230 elif key == 'files':
1231 if not isinstance(value, dict):
1232 raise ConfigError('Expected dict, got %r' % value)
1233 for subkey, subvalue in value.iteritems():
1234 if not isinstance(subkey, basestring):
1235 raise ConfigError('Expected string, got %r' % subkey)
1236 if not isinstance(subvalue, dict):
1237 raise ConfigError('Expected dict, got %r' % subvalue)
1238 for subsubkey, subsubvalue in subvalue.iteritems():
1239 if subsubkey == 'l':
1240 if not isinstance(subsubvalue, basestring):
1241 raise ConfigError('Expected string, got %r' % subsubvalue)
1242 elif subsubkey == 'm':
1243 if not isinstance(subsubvalue, int):
1244 raise ConfigError('Expected int, got %r' % subsubvalue)
1245 elif subsubkey == 'h':
1246 if not is_valid_hash(subsubvalue, algo):
1247 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1248 elif subsubkey == 's':
1249 if not isinstance(subsubvalue, int):
1250 raise ConfigError('Expected int, got %r' % subsubvalue)
1251 else:
1252 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001253 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001254 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001255 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1256 subvalue)
1257 if bool('h' in subvalue) != bool('s' in subvalue):
1258 raise ConfigError(
1259 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1260 subvalue)
1261 if bool('s' in subvalue) == bool('l' in subvalue):
1262 raise ConfigError(
1263 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1264 subvalue)
1265 if bool('l' in subvalue) and bool('m' in subvalue):
1266 raise ConfigError(
1267 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001268 subvalue)
1269
1270 elif key == 'includes':
1271 if not isinstance(value, list):
1272 raise ConfigError('Expected list, got %r' % value)
1273 if not value:
1274 raise ConfigError('Expected non-empty includes list')
1275 for subvalue in value:
1276 if not is_valid_hash(subvalue, algo):
1277 raise ConfigError('Expected sha-1, got %r' % subvalue)
1278
1279 elif key == 'read_only':
1280 if not isinstance(value, bool):
1281 raise ConfigError('Expected bool, got %r' % value)
1282
1283 elif key == 'relative_cwd':
1284 if not isinstance(value, basestring):
1285 raise ConfigError('Expected string, got %r' % value)
1286
1287 elif key == 'os':
1288 if os_flavor and value != os_flavor:
1289 raise ConfigError(
1290 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1291 (os_flavor, value))
1292
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001293 elif key == 'version':
1294 # Already checked above.
1295 pass
1296
maruel@chromium.org41601642013-09-18 19:40:46 +00001297 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001298 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001299
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001300 # Automatically fix os.path.sep if necessary. While .isolated files are always
1301 # in the the native path format, someone could want to download an .isolated
1302 # tree from another OS.
1303 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1304 if 'files' in data:
1305 data['files'] = dict(
1306 (k.replace(wrong_path_sep, os.path.sep), v)
1307 for k, v in data['files'].iteritems())
1308 for v in data['files'].itervalues():
1309 if 'l' in v:
1310 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1311 if 'relative_cwd' in data:
1312 data['relative_cwd'] = data['relative_cwd'].replace(
1313 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001314 return data
1315
1316
1317class IsolatedFile(object):
1318 """Represents a single parsed .isolated file."""
1319 def __init__(self, obj_hash, algo):
1320 """|obj_hash| is really the sha-1 of the file."""
1321 logging.debug('IsolatedFile(%s)' % obj_hash)
1322 self.obj_hash = obj_hash
1323 self.algo = algo
1324 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1325 # .isolate and all the .isolated files recursively included by it with
1326 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1327 # .isolated file in the hash table, is important, as the later ones are not
1328 # processed until the firsts are retrieved and read.
1329 self.can_fetch = False
1330
1331 # Raw data.
1332 self.data = {}
1333 # A IsolatedFile instance, one per object in self.includes.
1334 self.children = []
1335
1336 # Set once the .isolated file is loaded.
1337 self._is_parsed = False
1338 # Set once the files are fetched.
1339 self.files_fetched = False
1340
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001341 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001342 """Verifies the .isolated file is valid and loads this object with the json
1343 data.
1344 """
1345 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1346 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001347 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001348 self.children = [
1349 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1350 ]
1351 self._is_parsed = True
1352
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001353 def fetch_files(self, fetch_queue, files):
maruel@chromium.org41601642013-09-18 19:40:46 +00001354 """Adds files in this .isolated file not present in |files| dictionary.
1355
1356 Preemptively request files.
1357
1358 Note that |files| is modified by this function.
1359 """
1360 assert self.can_fetch
1361 if not self._is_parsed or self.files_fetched:
1362 return
1363 logging.debug('fetch_files(%s)' % self.obj_hash)
1364 for filepath, properties in self.data.get('files', {}).iteritems():
1365 # Root isolated has priority on the files being mapped. In particular,
1366 # overriden files must not be fetched.
1367 if filepath not in files:
1368 files[filepath] = properties
1369 if 'h' in properties:
1370 # Preemptively request files.
1371 logging.debug('fetching %s' % filepath)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001372 fetch_queue.add(WorkerPool.MED, properties['h'], properties['s'])
maruel@chromium.org41601642013-09-18 19:40:46 +00001373 self.files_fetched = True
1374
1375
1376class Settings(object):
1377 """Results of a completely parsed .isolated file."""
1378 def __init__(self):
1379 self.command = []
1380 self.files = {}
1381 self.read_only = None
1382 self.relative_cwd = None
1383 # The main .isolated file, a IsolatedFile instance.
1384 self.root = None
1385
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001386 def load(self, fetch_queue, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001387 """Loads the .isolated and all the included .isolated asynchronously.
1388
1389 It enables support for "included" .isolated files. They are processed in
1390 strict order but fetched asynchronously from the cache. This is important so
1391 that a file in an included .isolated file that is overridden by an embedding
1392 .isolated file is not fetched needlessly. The includes are fetched in one
1393 pass and the files are fetched as soon as all the ones on the left-side
1394 of the tree were fetched.
1395
1396 The prioritization is very important here for nested .isolated files.
1397 'includes' have the highest priority and the algorithm is optimized for both
1398 deep and wide trees. A deep one is a long link of .isolated files referenced
1399 one at a time by one item in 'includes'. A wide one has a large number of
1400 'includes' in a single .isolated file. 'left' is defined as an included
1401 .isolated file earlier in the 'includes' list. So the order of the elements
1402 in 'includes' is important.
1403 """
1404 self.root = IsolatedFile(root_isolated_hash, algo)
1405
1406 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1407 pending = {}
1408 # Set of hashes of already retrieved items to refuse recursive includes.
1409 seen = set()
1410
1411 def retrieve(isolated_file):
1412 h = isolated_file.obj_hash
1413 if h in seen:
1414 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1415 assert h not in pending
1416 seen.add(h)
1417 pending[h] = isolated_file
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001418 fetch_queue.add(WorkerPool.HIGH, h)
maruel@chromium.org41601642013-09-18 19:40:46 +00001419
1420 retrieve(self.root)
1421
1422 while pending:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001423 item_hash = fetch_queue.wait(pending)
maruel@chromium.org41601642013-09-18 19:40:46 +00001424 item = pending.pop(item_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001425 item.load(os_flavor, fetch_queue.cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001426 if item_hash == root_isolated_hash:
1427 # It's the root item.
1428 item.can_fetch = True
1429
1430 for new_child in item.children:
1431 retrieve(new_child)
1432
1433 # Traverse the whole tree to see if files can now be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001434 self._traverse_tree(fetch_queue, self.root)
maruel@chromium.org41601642013-09-18 19:40:46 +00001435
1436 def check(n):
1437 return all(check(x) for x in n.children) and n.files_fetched
1438 assert check(self.root)
1439
1440 self.relative_cwd = self.relative_cwd or ''
1441 self.read_only = self.read_only or False
1442
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001443 def _traverse_tree(self, fetch_queue, node):
maruel@chromium.org41601642013-09-18 19:40:46 +00001444 if node.can_fetch:
1445 if not node.files_fetched:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001446 self._update_self(fetch_queue, node)
maruel@chromium.org41601642013-09-18 19:40:46 +00001447 will_break = False
1448 for i in node.children:
1449 if not i.can_fetch:
1450 if will_break:
1451 break
1452 # Automatically mark the first one as fetcheable.
1453 i.can_fetch = True
1454 will_break = True
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001455 self._traverse_tree(fetch_queue, i)
maruel@chromium.org41601642013-09-18 19:40:46 +00001456
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001457 def _update_self(self, fetch_queue, node):
1458 node.fetch_files(fetch_queue, self.files)
maruel@chromium.org41601642013-09-18 19:40:46 +00001459 # Grabs properties.
1460 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001461 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001462 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001463 if self.command:
1464 self.command[0] = self.command[0].replace('/', os.path.sep)
1465 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001466 if self.read_only is None and node.data.get('read_only') is not None:
1467 self.read_only = node.data['read_only']
1468 if (self.relative_cwd is None and
1469 node.data.get('relative_cwd') is not None):
1470 self.relative_cwd = node.data['relative_cwd']
1471
1472
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001473def fetch_isolated(
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001474 isolated_hash, storage, cache, algo, outdir, os_flavor, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001475 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001476
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001477 Arguments:
1478 isolated_hash: hash of the root *.isolated file.
1479 storage: Storage class that communicates with isolate storage.
1480 cache: LocalCache class that knows how to store and map files locally.
1481 algo: hash algorithm to use.
1482 outdir: Output directory to map file tree to.
1483 os_flavor: OS flavor to choose when reading sections of *.isolated file.
1484 require_command: Ensure *.isolated specifies a command to run.
1485
1486 Returns:
1487 Settings object that holds details about loaded *.isolated file.
1488 """
1489 with cache:
1490 fetch_queue = FetchQueue(storage, cache)
1491 settings = Settings()
1492
1493 with tools.Profiler('GetIsolateds'):
1494 # Optionally support local files by manually adding them to cache.
1495 if not is_valid_hash(isolated_hash, algo):
1496 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1497
1498 # Load all *.isolated and start loading rest of the files.
1499 settings.load(fetch_queue, isolated_hash, os_flavor, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001500 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001501 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1502 # easy way to cancel them.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001503 raise ConfigError('No command to run')
1504
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001505 with tools.Profiler('GetRest'):
1506 # Create file system hierarchy.
1507 if not os.path.isdir(outdir):
1508 os.makedirs(outdir)
1509 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001510 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001511
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001512 # Ensure working directory exists.
1513 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1514 if not os.path.isdir(cwd):
1515 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001516
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001517 # Multimap: digest -> list of pairs (path, props).
1518 remaining = {}
1519 for filepath, props in settings.files.iteritems():
1520 if 'h' in props:
1521 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001522
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001523 # Now block on the remaining files to be downloaded and mapped.
1524 logging.info('Retrieving remaining files (%d of them)...',
1525 fetch_queue.pending_count)
1526 last_update = time.time()
1527 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1528 while remaining:
1529 detector.ping()
1530
1531 # Wait for any item to finish fetching to cache.
1532 digest = fetch_queue.wait(remaining)
1533
1534 # Link corresponding files to a fetched item in cache.
1535 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001536 cache.hardlink(
1537 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001538
1539 # Report progress.
1540 duration = time.time() - last_update
1541 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1542 msg = '%d files remaining...' % len(remaining)
1543 print msg
1544 logging.info(msg)
1545 last_update = time.time()
1546
1547 # Cache could evict some items we just tried to fetch, it's a fatal error.
1548 if not fetch_queue.verify_all_cached():
1549 raise MappingError('Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001550 return settings
1551
1552
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001553@subcommand.usage('<file1..fileN> or - to read from stdin')
1554def CMDarchive(parser, args):
1555 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001556 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001557
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001558 if files == ['-']:
1559 files = sys.stdin.readlines()
1560
1561 if not files:
1562 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001563
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001564 # Load the necessary metadata.
1565 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001566 infiles = dict(
1567 (
1568 f,
1569 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001570 's': os.stat(f).st_size,
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001571 'h': hash_file(f, get_hash_algo(options.namespace)),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001572 }
1573 )
1574 for f in files)
1575
vadimsh@chromium.orga4326472013-08-24 02:05:41 +00001576 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001577 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001578 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001579 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001580 infiles=infiles,
1581 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001582 if not ret:
1583 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1584 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001585
1586
1587def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001588 """Download data from the server.
1589
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001590 It can either download individual files or a complete tree from a .isolated
1591 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001592 """
1593 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001594 '-i', '--isolated', metavar='HASH',
1595 help='hash of an isolated file, .isolated file content is discarded, use '
1596 '--file if you need it')
1597 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001598 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1599 help='hash and destination of a file, can be used multiple times')
1600 parser.add_option(
1601 '-t', '--target', metavar='DIR', default=os.getcwd(),
1602 help='destination directory')
1603 options, args = parser.parse_args(args)
1604 if args:
1605 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001606 if bool(options.isolated) == bool(options.file):
1607 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001608
1609 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001610 storage = get_storage(options.isolate_server, options.namespace)
1611 cache = MemoryCache()
1612 algo = get_hash_algo(options.namespace)
1613
1614 # Fetching individual files.
1615 if options.file:
1616 channel = threading_utils.TaskChannel()
1617 pending = {}
1618 for digest, dest in options.file:
1619 pending[digest] = dest
1620 storage.async_fetch(
1621 channel,
1622 WorkerPool.MED,
1623 digest,
1624 UNKNOWN_FILE_SIZE,
1625 functools.partial(file_write, os.path.join(options.target, dest)))
1626 while pending:
1627 fetched = channel.pull()
1628 dest = pending.pop(fetched)
1629 logging.info('%s: %s', fetched, dest)
1630
1631 # Fetching whole isolated tree.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001632 if options.isolated:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001633 settings = fetch_isolated(
1634 isolated_hash=options.isolated,
1635 storage=storage,
1636 cache=cache,
1637 algo=algo,
1638 outdir=options.target,
1639 os_flavor=None,
1640 require_command=False)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001641 rel = os.path.join(options.target, settings.relative_cwd)
1642 print('To run this test please run from the directory %s:' %
1643 os.path.join(options.target, rel))
1644 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001645
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001646 return 0
1647
1648
1649class OptionParserIsolateServer(tools.OptionParserWithLogging):
1650 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001651 tools.OptionParserWithLogging.__init__(
1652 self,
1653 version=__version__,
1654 prog=os.path.basename(sys.modules[__name__].__file__),
1655 **kwargs)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001656 self.add_option(
1657 '-I', '--isolate-server',
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00001658 metavar='URL', default='',
1659 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001660 self.add_option(
1661 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001662 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001663
1664 def parse_args(self, *args, **kwargs):
1665 options, args = tools.OptionParserWithLogging.parse_args(
1666 self, *args, **kwargs)
1667 options.isolate_server = options.isolate_server.rstrip('/')
1668 if not options.isolate_server:
1669 self.error('--isolate-server is required.')
1670 return options, args
1671
1672
1673def main(args):
1674 dispatcher = subcommand.CommandDispatcher(__name__)
1675 try:
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001676 return dispatcher.execute(OptionParserIsolateServer(), args)
vadimsh@chromium.orgd908a542013-10-30 01:36:17 +00001677 except Exception as e:
1678 tools.report_error(e)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001679 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001680
1681
1682if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001683 fix_encoding.fix_encoding()
1684 tools.disable_buffering()
1685 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001686 sys.exit(main(sys.argv[1:]))