blob: 344872353237ae665e8c220b4a27a61e811d3dc9 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002# Copyright 2013 The Chromium Authors. All rights reserved.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00003# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Archives a set of files to a server."""
7
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00008__version__ = '0.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000010import hashlib
maruel@chromium.org41601642013-09-18 19:40:46 +000011import json
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import sys
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000018import urllib
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000019import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000021from third_party import colorama
22from third_party.depot_tools import fix_encoding
23from third_party.depot_tools import subcommand
24
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000025from utils import net
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000026from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000027from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000028
29
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000030# Version of isolate protocol passed to the server in /handshake request.
31ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000032
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000033
34# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000035# All files are sorted by likelihood of a change in the file content
36# (currently file size is used to estimate this: larger the file -> larger the
37# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000038# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000039# and so on. Numbers here is a trade-off; the more per request, the lower the
40# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
41# larger values cause longer lookups, increasing the initial latency to start
42# uploading, which is especially an issue for large files. This value is
43# optimized for the "few thousands files to look up with minimal number of large
44# files missing" case.
45ITEMS_PER_CONTAINS_QUERIES = [20, 20, 50, 50, 50, 100]
csharp@chromium.org07fa7592013-01-11 18:19:30 +000046
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000047
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000048# A list of already compressed extension types that should not receive any
49# compression before being uploaded.
50ALREADY_COMPRESSED_TYPES = [
51 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'pdf', 'png',
52 'wav', 'zip'
53]
54
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000055
maruel@chromium.orgdedbf492013-09-12 20:42:11 +000056# The file size to be used when we don't know the correct file size,
57# generally used for .isolated files.
58UNKNOWN_FILE_SIZE = None
59
60
61# The size of each chunk to read when downloading and unzipping files.
62ZIPPED_FILE_CHUNK = 16 * 1024
63
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000064# Chunk size to use when doing disk I/O.
65DISK_FILE_CHUNK = 1024 * 1024
66
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000067# Chunk size to use when reading from network stream.
68NET_IO_FILE_CHUNK = 16 * 1024
69
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000070
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000071# Read timeout in seconds for downloads from isolate storage. If there's no
72# response from the server within this timeout whole download will be aborted.
73DOWNLOAD_READ_TIMEOUT = 60
74
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +000075# Maximum expected delay (in seconds) between successive file fetches
76# in run_tha_test. If it takes longer than that, a deadlock might be happening
77# and all stack frames for all threads are dumped to log.
78DEADLOCK_TIMEOUT = 5 * 60
79
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000080
maruel@chromium.org41601642013-09-18 19:40:46 +000081# The delay (in seconds) to wait between logging statements when retrieving
82# the required files. This is intended to let the user (or buildbot) know that
83# the program is still running.
84DELAY_BETWEEN_UPDATES_IN_SECS = 30
85
86
maruel@chromium.org385d73d2013-09-19 18:33:21 +000087# Sadly, hashlib uses 'sha1' instead of the standard 'sha-1' so explicitly
88# specify the names here.
89SUPPORTED_ALGOS = {
90 'md5': hashlib.md5,
91 'sha-1': hashlib.sha1,
92 'sha-512': hashlib.sha512,
93}
94
95
96# Used for serialization.
97SUPPORTED_ALGOS_REVERSE = dict((v, k) for k, v in SUPPORTED_ALGOS.iteritems())
98
99
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000100class ConfigError(ValueError):
101 """Generic failure to load a .isolated file."""
102 pass
103
104
105class MappingError(OSError):
106 """Failed to recreate the tree."""
107 pass
108
109
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000110def is_valid_hash(value, algo):
111 """Returns if the value is a valid hash for the corresponding algorithm."""
112 size = 2 * algo().digest_size
113 return bool(re.match(r'^[a-fA-F0-9]{%d}$' % size, value))
114
115
116def hash_file(filepath, algo):
117 """Calculates the hash of a file without reading it all in memory at once.
118
119 |algo| should be one of hashlib hashing algorithm.
120 """
121 digest = algo()
maruel@chromium.org037758d2012-12-10 17:59:46 +0000122 with open(filepath, 'rb') as f:
123 while True:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000124 chunk = f.read(DISK_FILE_CHUNK)
maruel@chromium.org037758d2012-12-10 17:59:46 +0000125 if not chunk:
126 break
127 digest.update(chunk)
128 return digest.hexdigest()
129
130
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000131def stream_read(stream, chunk_size):
132 """Reads chunks from |stream| and yields them."""
133 while True:
134 data = stream.read(chunk_size)
135 if not data:
136 break
137 yield data
138
139
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000140def file_read(filepath, chunk_size=DISK_FILE_CHUNK):
141 """Yields file content in chunks of given |chunk_size|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 with open(filepath, 'rb') as f:
143 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000144 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000145 if not data:
146 break
147 yield data
148
149
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000150def file_write(filepath, content_generator):
151 """Writes file content as generated by content_generator.
152
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 Creates the intermediary directory as needed.
154
155 Returns the number of bytes written.
156
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000157 Meant to be mocked out in unit tests.
158 """
159 filedir = os.path.dirname(filepath)
160 if not os.path.isdir(filedir):
161 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000162 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000163 with open(filepath, 'wb') as f:
164 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000165 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000166 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000167 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000168
169
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000170def zip_compress(content_generator, level=7):
171 """Reads chunks from |content_generator| and yields zip compressed chunks."""
172 compressor = zlib.compressobj(level)
173 for chunk in content_generator:
174 compressed = compressor.compress(chunk)
175 if compressed:
176 yield compressed
177 tail = compressor.flush(zlib.Z_FINISH)
178 if tail:
179 yield tail
180
181
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000182def zip_decompress(content_generator, chunk_size=DISK_FILE_CHUNK):
183 """Reads zipped data from |content_generator| and yields decompressed data.
184
185 Decompresses data in small chunks (no larger than |chunk_size|) so that
186 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
187
188 Raises IOError if data is corrupted or incomplete.
189 """
190 decompressor = zlib.decompressobj()
191 compressed_size = 0
192 try:
193 for chunk in content_generator:
194 compressed_size += len(chunk)
195 data = decompressor.decompress(chunk, chunk_size)
196 if data:
197 yield data
198 while decompressor.unconsumed_tail:
199 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
200 if data:
201 yield data
202 tail = decompressor.flush()
203 if tail:
204 yield tail
205 except zlib.error as e:
206 raise IOError(
207 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
208 # Ensure all data was read and decompressed.
209 if decompressor.unused_data or decompressor.unconsumed_tail:
210 raise IOError('Not all data was decompressed')
211
212
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000213def get_zip_compression_level(filename):
214 """Given a filename calculates the ideal zip compression level to use."""
215 file_ext = os.path.splitext(filename)[1].lower()
216 # TODO(csharp): Profile to find what compression level works best.
217 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
218
219
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000220def create_directories(base_directory, files):
221 """Creates the directory structure needed by the given list of files."""
222 logging.debug('create_directories(%s, %d)', base_directory, len(files))
223 # Creates the tree of directories to create.
224 directories = set(os.path.dirname(f) for f in files)
225 for item in list(directories):
226 while item:
227 directories.add(item)
228 item = os.path.dirname(item)
229 for d in sorted(directories):
230 if d:
231 os.mkdir(os.path.join(base_directory, d))
232
233
234def create_links(base_directory, files):
235 """Creates any links needed by the given set of files."""
236 for filepath, properties in files:
237 if 'l' not in properties:
238 continue
239 if sys.platform == 'win32':
240 # TODO(maruel): Create junctions or empty text files similar to what
241 # cygwin do?
242 logging.warning('Ignoring symlink %s', filepath)
243 continue
244 outfile = os.path.join(base_directory, filepath)
245 # symlink doesn't exist on Windows. So the 'link' property should
246 # never be specified for windows .isolated file.
247 os.symlink(properties['l'], outfile) # pylint: disable=E1101
248 if 'm' in properties:
249 lchmod = getattr(os, 'lchmod', None)
250 if lchmod:
251 lchmod(outfile, properties['m'])
252
253
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000254def generate_remaining_files(files):
255 """Generates a dictionary of all the remaining files to be downloaded."""
256 remaining = {}
257 for filepath, props in files:
258 if 'h' in props:
259 remaining.setdefault(props['h'], []).append((filepath, props))
260
261 return remaining
262
263
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000264def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000265 """Determines if the given files appears valid.
266
267 Currently it just checks the file's size.
268 """
269 if size == UNKNOWN_FILE_SIZE:
270 return True
271 actual_size = os.stat(filepath).st_size
272 if size != actual_size:
273 logging.warning(
274 'Found invalid item %s; %d != %d',
275 os.path.basename(filepath), actual_size, size)
276 return False
277 return True
278
279
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000280def try_remove(filepath):
281 """Removes a file without crashing even if it doesn't exist."""
282 try:
283 os.remove(filepath)
284 except OSError:
285 pass
286
287
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000288class Item(object):
289 """An item to push to Storage.
290
291 It starts its life in a main thread, travels to 'contains' thread, then to
292 'push' thread and then finally back to the main thread.
293
294 It is never used concurrently from multiple threads.
295 """
296
297 def __init__(self, digest, size, is_isolated=False):
298 self.digest = digest
299 self.size = size
300 self.is_isolated = is_isolated
301 self.compression_level = 6
302 self.push_state = None
303
304 def content(self, chunk_size):
305 """Iterable with content of this item in chunks of given size."""
306 raise NotImplementedError()
307
308
309class FileItem(Item):
310 """A file to push to Storage."""
311
312 def __init__(self, path, digest, size, is_isolated):
313 super(FileItem, self).__init__(digest, size, is_isolated)
314 self.path = path
315 self.compression_level = get_zip_compression_level(path)
316
317 def content(self, chunk_size):
318 return file_read(self.path, chunk_size)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000319
320
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000321class Storage(object):
322 """Efficiently downloads or uploads large set of files via StorageApi."""
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000323
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000324 def __init__(self, storage_api, use_zip):
325 self.use_zip = use_zip
326 self._storage_api = storage_api
327 self._cpu_thread_pool = None
328 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000329
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000330 @property
331 def cpu_thread_pool(self):
332 """ThreadPool for CPU-bound tasks like zipping."""
333 if self._cpu_thread_pool is None:
334 self._cpu_thread_pool = threading_utils.ThreadPool(
335 2, max(threading_utils.num_processors(), 2), 0, 'zip')
336 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000337
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000338 @property
339 def net_thread_pool(self):
340 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
341 if self._net_thread_pool is None:
342 self._net_thread_pool = WorkerPool()
343 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000344
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000345 def close(self):
346 """Waits for all pending tasks to finish."""
347 if self._cpu_thread_pool:
348 self._cpu_thread_pool.join()
349 self._cpu_thread_pool.close()
350 self._cpu_thread_pool = None
351 if self._net_thread_pool:
352 self._net_thread_pool.join()
353 self._net_thread_pool.close()
354 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000355
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000356 def __enter__(self):
357 """Context manager interface."""
358 return self
359
360 def __exit__(self, _exc_type, _exc_value, _traceback):
361 """Context manager interface."""
362 self.close()
363 return False
364
365 def upload_tree(self, indir, infiles):
366 """Uploads the given tree to the isolate server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000367
368 Arguments:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000369 indir: root directory the infiles are based in.
370 infiles: dict of files to upload from |indir|.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000371 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000372 logging.info('upload tree(indir=%s, files=%d)', indir, len(infiles))
373
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000374 # TODO(vadimsh): Introduce Item as a part of the public interface?
375
376 # Convert |indir| + |infiles| into a list of FileItem objects.
377 # Filter out symlinks, since they are not represented by items on isolate
378 # server side.
379 items = [
380 FileItem(
381 path=os.path.join(indir, filepath),
382 digest=metadata['h'],
383 size=metadata['s'],
384 is_isolated=metadata.get('priority') == '0')
385 for filepath, metadata in infiles.iteritems()
386 if 'l' not in metadata
387 ]
388
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000389 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000390 missing = set()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000391 channel = threading_utils.TaskChannel()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000392 for missing_item in self.get_missing_items(items):
393 missing.add(missing_item)
394 self.async_push(
395 channel,
396 WorkerPool.HIGH if missing_item.is_isolated else WorkerPool.MED,
397 missing_item)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000398
399 # No need to spawn deadlock detector thread if there's nothing to upload.
400 if missing:
401 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
402 # Wait for all started uploads to finish.
403 uploaded = 0
404 while uploaded != len(missing):
405 detector.ping()
406 item = channel.pull()
407 uploaded += 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000408 logging.debug(
409 'Uploaded %d / %d: %s', uploaded, len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000410 logging.info('All files are uploaded')
411
412 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000413 total = len(items)
414 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 logging.info(
416 'Total: %6d, %9.1fkb',
417 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000418 total_size / 1024.)
419 cache_hit = set(items) - missing
420 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000421 logging.info(
422 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
423 len(cache_hit),
424 cache_hit_size / 1024.,
425 len(cache_hit) * 100. / total,
426 cache_hit_size * 100. / total_size if total_size else 0)
427 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000428 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000429 logging.info(
430 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
431 len(cache_miss),
432 cache_miss_size / 1024.,
433 len(cache_miss) * 100. / total,
434 cache_miss_size * 100. / total_size if total_size else 0)
435
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000436 def async_push(self, channel, priority, item):
437 """Starts asynchronous push to the server in a parallel thread.
438
439 Arguments:
440 channel: TaskChannel object that receives back |item| when upload ends.
441 priority: thread pool task priority for the push.
442 item: item to upload as instance of Item class.
443 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000444 def push(content, size):
445 """Pushes an item and returns its id, to pass as a result to |channel|."""
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000446 self._storage_api.push(item, content, size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000447 return item
448
449 # If zipping is not required, just start a push task.
450 if not self.use_zip:
451 self.net_thread_pool.add_task_with_channel(channel, priority, push,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000452 item.content(DISK_FILE_CHUNK), item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 return
454
455 # If zipping is enabled, zip in a separate thread.
456 def zip_and_push():
457 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
458 # content right here. It will block until all file is zipped.
459 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000460 stream = zip_compress(item.content(ZIPPED_FILE_CHUNK),
461 item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 data = ''.join(stream)
463 except Exception as exc:
464 logging.error('Failed to zip \'%s\': %s', item, exc)
465 channel.send_exception(exc)
466 return
467 self.net_thread_pool.add_task_with_channel(channel, priority, push,
468 [data], UNKNOWN_FILE_SIZE)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000469 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000470
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000471 def get_missing_items(self, items):
472 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000473
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000475
476 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000477 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000478
479 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000480 Item objects that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 channel = threading_utils.TaskChannel()
483 pending = 0
484 # Enqueue all requests.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000485 for batch in self.batch_items_for_check(items):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000486 self.net_thread_pool.add_task_with_channel(channel, WorkerPool.HIGH,
487 self._storage_api.contains, batch)
488 pending += 1
489 # Yield results as they come in.
490 for _ in xrange(pending):
491 for missing in channel.pull():
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000492 yield missing
493
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000494 @staticmethod
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000495 def batch_items_for_check(items):
496 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000497
498 Each batch corresponds to a single 'exists?' query to the server via a call
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000499 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000500
501 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000502 items: a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000503
504 Yields:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000505 Batches of items to query for existence in a single operation,
506 each batch is a list of Item objects.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000507 """
508 batch_count = 0
509 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
510 next_queries = []
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000511 for item in sorted(items, key=lambda x: x.size, reverse=True):
512 next_queries.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000513 if len(next_queries) == batch_size_limit:
514 yield next_queries
515 next_queries = []
516 batch_count += 1
517 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
518 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
519 if next_queries:
520 yield next_queries
521
522
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000523class StorageApi(object):
524 """Interface for classes that implement low-level storage operations."""
525
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000526 def fetch(self, digest, size):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000527 """Fetches an object and yields its content.
528
529 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000530 digest: hash digest of item to download.
531 size: expected size of the item, to validate it.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532
533 Yields:
534 Chunks of downloaded item (as str objects).
535 """
536 raise NotImplementedError()
537
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000538 def push(self, item, content, size):
539 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540
541 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 item: Item object that holds information about an item being pushed.
543 content: a generator that yields chunks to push.
544 size: expected size of stream produced by |content|.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000545
546 Returns:
547 None.
548 """
549 raise NotImplementedError()
550
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000551 def contains(self, items):
552 """Checks for existence of given |items| on the server.
553
554 Mutates |items| by assigning opaque implement specific object to Item's
555 push_state attribute on missing entries in the datastore.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000556
557 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000558 items: list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559
560 Returns:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000561 A list of items missing on server as a list of Item objects.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000562 """
563 raise NotImplementedError()
564
565
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000566class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000567 """StorageApi implementation that downloads and uploads to Isolate Server.
568
569 It uploads and downloads directly from Google Storage whenever appropriate.
570 """
571
572 class _PushState(object):
573 """State needed to call .push(), to be stored in Item.push_state."""
574 def __init__(self, upload_url, finalize_url):
575 self.upload_url = upload_url
576 self.finalize_url = finalize_url
577 self.uploaded = False
578 self.finalized = False
579
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000580 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000581 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000582 assert base_url.startswith('http'), base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000583 self.base_url = base_url.rstrip('/')
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000584 self.namespace = namespace
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000585 self.algo = get_hash_algo(namespace)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000586 self._use_zip = is_namespace_with_compression(namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000587 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000588 self._server_caps = None
589
590 @staticmethod
591 def _generate_handshake_request():
592 """Returns a dict to be sent as handshake request body."""
593 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
594 return {
595 'client_app_version': __version__,
596 'fetcher': True,
597 'protocol_version': ISOLATE_PROTOCOL_VERSION,
598 'pusher': True,
599 }
600
601 @staticmethod
602 def _validate_handshake_response(caps):
603 """Validates and normalizes handshake response."""
604 logging.info('Protocol version: %s', caps['protocol_version'])
605 logging.info('Server version: %s', caps['server_app_version'])
606 if caps.get('error'):
607 raise MappingError(caps['error'])
608 if not caps['access_token']:
609 raise ValueError('access_token is missing')
610 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000611
612 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000613 def _server_capabilities(self):
614 """Performs handshake with the server if not yet done.
615
616 Returns:
617 Server capabilities dictionary as returned by /handshake endpoint.
618
619 Raises:
620 MappingError if server rejects the handshake.
621 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000622 # TODO(maruel): Make this request much earlier asynchronously while the
623 # files are being enumerated.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000624 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000625 if self._server_caps is None:
626 request_body = json.dumps(
627 self._generate_handshake_request(), separators=(',', ':'))
628 response = net.url_read(
629 url=self.base_url + '/content-gs/handshake',
630 data=request_body,
631 content_type='application/json',
632 method='POST')
633 if response is None:
634 raise MappingError('Failed to perform handshake.')
635 try:
636 caps = json.loads(response)
637 if not isinstance(caps, dict):
638 raise ValueError('Expecting JSON dict')
639 self._server_caps = self._validate_handshake_response(caps)
640 except (ValueError, KeyError, TypeError) as exc:
641 # KeyError exception has very confusing str conversion: it's just a
642 # missing key value and nothing else. So print exception class name
643 # as well.
644 raise MappingError('Invalid handshake response (%s): %s' % (
645 exc.__class__.__name__, exc))
646 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000647
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000648 def fetch(self, digest, size):
649 assert isinstance(digest, basestring)
650 assert (isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE)
651
652 source_url = '%s/content-gs/retrieve/%s/%s' % (
653 self.base_url, self.namespace, digest)
654 logging.debug('download_file(%s)', source_url)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000655
656 # Because the app engine DB is only eventually consistent, retry 404 errors
657 # because the file might just not be visible yet (even though it has been
658 # uploaded).
659 connection = net.url_open(
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000660 source_url, retry_404=True, read_timeout=DOWNLOAD_READ_TIMEOUT)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000661 if not connection:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000662 raise IOError('Unable to open connection to %s' % source_url)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000663
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000664 try:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000665 # Prepare reading pipeline.
666 generator = stream_read(connection, NET_IO_FILE_CHUNK)
667 if self._use_zip:
668 generator = zip_decompress(generator, DISK_FILE_CHUNK)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000669
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000670 # Read and yield data, calculate total length of the decompressed stream.
671 total_size = 0
672 for chunk in generator:
673 total_size += len(chunk)
674 yield chunk
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000675
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000676 # Verify data length matches expectation.
677 if size != UNKNOWN_FILE_SIZE and total_size != size:
678 raise IOError('Incorrect file size: expected %d, got %d' % (
679 size, total_size))
maruel@chromium.orgc2bfef42013-08-30 21:46:26 +0000680
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000681 except IOError as err:
682 logging.warning('Failed to fetch %s: %s', digest, err)
683 raise
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000684
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000685 def push(self, item, content, size):
686 assert isinstance(item, Item)
687 assert isinstance(item.push_state, IsolateServer._PushState)
688 assert not item.push_state.finalized
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000689
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000690 # TODO(vadimsh): Do not read from |content| generator when retrying push.
691 # If |content| is indeed a generator, it can not be re-winded back
692 # to the beginning of the stream. A retry will find it exhausted. A possible
693 # solution is to wrap |content| generator with some sort of caching
694 # restartable generator. It should be done alongside streaming support
695 # implementation.
696
697 # This push operation may be a retry after failed finalization call below,
698 # no need to reupload contents in that case.
699 if not item.push_state.uploaded:
700 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
701 # upload support is implemented.
702 if isinstance(content, list) and len(content) == 1:
703 content = content[0]
704 else:
705 content = ''.join(content)
706 # PUT file to |upload_url|.
707 response = net.url_read(
708 url=item.push_state.upload_url,
709 data=content,
710 content_type='application/octet-stream',
711 method='PUT')
712 if response is None:
713 raise IOError('Failed to upload a file %s to %s' % (
714 item.digest, item.push_state.upload_url))
715 item.push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000716 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000717 logging.info(
718 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000719
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000720 # Optionally notify the server that it's done.
721 if item.push_state.finalize_url:
722 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
723 # send it to isolated server. That way isolate server can verify that
724 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
725 # stored files).
726 response = net.url_read(
727 url=item.push_state.finalize_url,
728 data='',
729 content_type='application/json',
730 method='POST')
731 if response is None:
732 raise IOError('Failed to finalize an upload of %s' % item.digest)
733 item.push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000734
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000735 def contains(self, items):
736 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +0000737
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000738 # Request body is a json encoded list of dicts.
739 body = [
740 {
741 'h': item.digest,
742 's': item.size,
743 'i': int(item.is_isolated),
744 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000745 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000746
747 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
748 self.base_url,
749 self.namespace,
750 urllib.quote(self._server_capabilities['access_token']))
751 response_body = net.url_read(
752 url=query_url,
753 data=json.dumps(body, separators=(',', ':')),
754 content_type='application/json',
755 method='POST')
756 if response_body is None:
757 raise MappingError('Failed to execute /pre-upload query')
758
759 # Response body is a list of push_urls (or null if file is already present).
760 try:
761 response = json.loads(response_body)
762 if not isinstance(response, list):
763 raise ValueError('Expecting response with json-encoded list')
764 if len(response) != len(items):
765 raise ValueError(
766 'Incorrect number of items in the list, expected %d, '
767 'but got %d' % (len(items), len(response)))
768 except ValueError as err:
769 raise MappingError(
770 'Invalid response from server: %s, body is %s' % (err, response_body))
771
772 # Pick Items that are missing, attach _PushState to them.
773 missing_items = []
774 for i, push_urls in enumerate(response):
775 if push_urls:
776 assert len(push_urls) == 2, str(push_urls)
777 item = items[i]
778 assert item.push_state is None
779 item.push_state = IsolateServer._PushState(push_urls[0], push_urls[1])
780 missing_items.append(item)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000781 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000782 len(items), len(items) - len(missing_items))
783 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000784
785
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000786class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000787 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000788
789 The common use case is a NFS/CIFS file server that is mounted locally that is
790 used to fetch the file on a local partition.
791 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000792
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000793 def __init__(self, base_path):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000794 super(FileSystem, self).__init__()
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000795 self.base_path = base_path
796
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000797 def fetch(self, digest, size):
798 assert isinstance(digest, basestring)
799 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
800 source = os.path.join(self.base_path, digest)
801 if size != UNKNOWN_FILE_SIZE and not is_valid_file(source, size):
802 raise IOError('Invalid file %s' % digest)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000803 return file_read(source)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000804
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000805 def push(self, item, content, size):
806 assert isinstance(item, Item)
807 assert isinstance(size, (int, long)) or size == UNKNOWN_FILE_SIZE
808 dest = os.path.join(self.base_path, item.digest)
809 total = file_write(dest, content)
810 if size != UNKNOWN_FILE_SIZE and total != size:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000811 os.remove(dest)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000812 raise IOError('Invalid file %s, %d != %d' % (item.digest, total, size))
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000813
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000814 def contains(self, items):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000815 return [
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000816 item for item in items
817 if not os.path.exists(os.path.join(self.base_path, item.digest))
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000818 ]
819
820
821def get_hash_algo(_namespace):
822 """Return hash algorithm class to use when uploading to given |namespace|."""
823 # TODO(vadimsh): Implement this at some point.
824 return hashlib.sha1
825
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000826
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000827def is_namespace_with_compression(namespace):
828 """Returns True if given |namespace| stores compressed objects."""
829 return namespace.endswith(('-gzip', '-deflate'))
830
831
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000832def get_storage_api(file_or_url, namespace):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000833 """Returns an object that implements StorageApi interface."""
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000834 if re.match(r'^https?://.+$', file_or_url):
835 return IsolateServer(file_or_url, namespace)
836 else:
837 return FileSystem(file_or_url)
838
839
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000840class WorkerPool(threading_utils.AutoRetryThreadPool):
841 """Thread pool that automatically retries on IOError and runs a preconfigured
842 function.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000843 """
844 # Initial and maximum number of worker threads.
845 INITIAL_WORKERS = 2
846 MAX_WORKERS = 16
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000847 RETRIES = 5
848
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000849 def __init__(self):
maruel@chromium.org781ccf62013-09-17 19:39:47 +0000850 super(WorkerPool, self).__init__(
851 [IOError],
852 self.RETRIES,
853 self.INITIAL_WORKERS,
854 self.MAX_WORKERS,
855 0,
856 'remote')
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000857
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000858
maruel@chromium.org7b844a62013-09-17 13:04:59 +0000859def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000860 """Uploads the given tree to the given url.
861
862 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000863 base_url: The base url, it is assume that |base_url|/has/ can be used to
864 query if an element was already uploaded, and |base_url|/store/
865 can be used to upload a new element.
866 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000867 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +0000868 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000869 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000870 remote = get_storage_api(base_url, namespace)
871 with Storage(remote, is_namespace_with_compression(namespace)) as storage:
872 storage.upload_tree(indir, infiles)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +0000873 return 0
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000874
875
maruel@chromium.org41601642013-09-18 19:40:46 +0000876class MemoryCache(object):
877 """This class is intended to be usable everywhere the Cache class is.
878
879 Instead of downloading to a cache, all files are kept in memory to be stored
880 in the target directory directly.
881 """
882
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000883 def __init__(self, remote):
maruel@chromium.org41601642013-09-18 19:40:46 +0000884 self.remote = remote
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000885 self._pool = None
maruel@chromium.org41601642013-09-18 19:40:46 +0000886 self._lock = threading.Lock()
887 self._contents = {}
888
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000889 def set_pool(self, pool):
890 self._pool = pool
891
maruel@chromium.org41601642013-09-18 19:40:46 +0000892 def retrieve(self, priority, item, size):
893 """Gets the requested file."""
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000894 self._pool.add_task(priority, self._on_content, item, size)
maruel@chromium.org41601642013-09-18 19:40:46 +0000895
896 def wait_for(self, items):
897 """Starts a loop that waits for at least one of |items| to be retrieved.
898
899 Returns the first item retrieved.
900 """
901 with self._lock:
902 # Flush items already present.
903 for item in items:
904 if item in self._contents:
905 return item
906
907 while True:
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000908 downloaded = self._pool.get_one_result()
maruel@chromium.org41601642013-09-18 19:40:46 +0000909 if downloaded in items:
910 return downloaded
911
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000912 def add(self, filepath, item):
913 with self._lock:
914 with open(filepath, 'rb') as f:
915 self._contents[item] = f.read()
maruel@chromium.org41601642013-09-18 19:40:46 +0000916
917 def read(self, item):
918 return self._contents[item]
919
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000920 def store_to(self, item, dest):
921 file_write(dest, [self._contents[item]])
922
923 def _on_content(self, item, size):
maruel@chromium.org41601642013-09-18 19:40:46 +0000924 data = ''.join(self.remote.fetch(item, size))
925 with self._lock:
926 self._contents[item] = data
927 return item
928
929 def __enter__(self):
930 return self
931
932 def __exit__(self, _exc_type, _exec_value, _traceback):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +0000933 with self._lock:
934 self._contents = {}
maruel@chromium.org41601642013-09-18 19:40:46 +0000935 return False
936
937
938def load_isolated(content, os_flavor, algo):
939 """Verifies the .isolated file is valid and loads this object with the json
940 data.
maruel@chromium.org385d73d2013-09-19 18:33:21 +0000941
942 Arguments:
943 - content: raw serialized content to load.
944 - os_flavor: OS to load this file on. Optional.
945 - algo: hashlib algorithm class. Used to confirm the algorithm matches the
946 algorithm used on the Isolate Server.
maruel@chromium.org41601642013-09-18 19:40:46 +0000947 """
948 try:
949 data = json.loads(content)
950 except ValueError:
951 raise ConfigError('Failed to parse: %s...' % content[:100])
952
953 if not isinstance(data, dict):
954 raise ConfigError('Expected dict, got %r' % data)
955
maruel@chromium.org385d73d2013-09-19 18:33:21 +0000956 # Check 'version' first, since it could modify the parsing after.
957 value = data.get('version', '1.0')
958 if not isinstance(value, basestring):
959 raise ConfigError('Expected string, got %r' % value)
960 if not re.match(r'^(\d+)\.(\d+)$', value):
961 raise ConfigError('Expected a compatible version, got %r' % value)
962 if value.split('.', 1)[0] != '1':
963 raise ConfigError('Expected compatible \'1.x\' version, got %r' % value)
964
965 if algo is None:
966 # Default the algorithm used in the .isolated file itself, falls back to
967 # 'sha-1' if unspecified.
968 algo = SUPPORTED_ALGOS_REVERSE[data.get('algo', 'sha-1')]
969
maruel@chromium.org41601642013-09-18 19:40:46 +0000970 for key, value in data.iteritems():
maruel@chromium.org385d73d2013-09-19 18:33:21 +0000971 if key == 'algo':
972 if not isinstance(value, basestring):
973 raise ConfigError('Expected string, got %r' % value)
974 if value not in SUPPORTED_ALGOS:
975 raise ConfigError(
976 'Expected one of \'%s\', got %r' %
977 (', '.join(sorted(SUPPORTED_ALGOS)), value))
978 if value != SUPPORTED_ALGOS_REVERSE[algo]:
979 raise ConfigError(
980 'Expected \'%s\', got %r' % (SUPPORTED_ALGOS_REVERSE[algo], value))
981
982 elif key == 'command':
maruel@chromium.org41601642013-09-18 19:40:46 +0000983 if not isinstance(value, list):
984 raise ConfigError('Expected list, got %r' % value)
985 if not value:
986 raise ConfigError('Expected non-empty command')
987 for subvalue in value:
988 if not isinstance(subvalue, basestring):
989 raise ConfigError('Expected string, got %r' % subvalue)
990
991 elif key == 'files':
992 if not isinstance(value, dict):
993 raise ConfigError('Expected dict, got %r' % value)
994 for subkey, subvalue in value.iteritems():
995 if not isinstance(subkey, basestring):
996 raise ConfigError('Expected string, got %r' % subkey)
997 if not isinstance(subvalue, dict):
998 raise ConfigError('Expected dict, got %r' % subvalue)
999 for subsubkey, subsubvalue in subvalue.iteritems():
1000 if subsubkey == 'l':
1001 if not isinstance(subsubvalue, basestring):
1002 raise ConfigError('Expected string, got %r' % subsubvalue)
1003 elif subsubkey == 'm':
1004 if not isinstance(subsubvalue, int):
1005 raise ConfigError('Expected int, got %r' % subsubvalue)
1006 elif subsubkey == 'h':
1007 if not is_valid_hash(subsubvalue, algo):
1008 raise ConfigError('Expected sha-1, got %r' % subsubvalue)
1009 elif subsubkey == 's':
1010 if not isinstance(subsubvalue, int):
1011 raise ConfigError('Expected int, got %r' % subsubvalue)
1012 else:
1013 raise ConfigError('Unknown subsubkey %s' % subsubkey)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001014 if bool('h' in subvalue) == bool('l' in subvalue):
maruel@chromium.org41601642013-09-18 19:40:46 +00001015 raise ConfigError(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001016 'Need only one of \'h\' (sha-1) or \'l\' (link), got: %r' %
1017 subvalue)
1018 if bool('h' in subvalue) != bool('s' in subvalue):
1019 raise ConfigError(
1020 'Both \'h\' (sha-1) and \'s\' (size) should be set, got: %r' %
1021 subvalue)
1022 if bool('s' in subvalue) == bool('l' in subvalue):
1023 raise ConfigError(
1024 'Need only one of \'s\' (size) or \'l\' (link), got: %r' %
1025 subvalue)
1026 if bool('l' in subvalue) and bool('m' in subvalue):
1027 raise ConfigError(
1028 'Cannot use \'m\' (mode) and \'l\' (link), got: %r' %
maruel@chromium.org41601642013-09-18 19:40:46 +00001029 subvalue)
1030
1031 elif key == 'includes':
1032 if not isinstance(value, list):
1033 raise ConfigError('Expected list, got %r' % value)
1034 if not value:
1035 raise ConfigError('Expected non-empty includes list')
1036 for subvalue in value:
1037 if not is_valid_hash(subvalue, algo):
1038 raise ConfigError('Expected sha-1, got %r' % subvalue)
1039
1040 elif key == 'read_only':
1041 if not isinstance(value, bool):
1042 raise ConfigError('Expected bool, got %r' % value)
1043
1044 elif key == 'relative_cwd':
1045 if not isinstance(value, basestring):
1046 raise ConfigError('Expected string, got %r' % value)
1047
1048 elif key == 'os':
1049 if os_flavor and value != os_flavor:
1050 raise ConfigError(
1051 'Expected \'os\' to be \'%s\' but got \'%s\'' %
1052 (os_flavor, value))
1053
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001054 elif key == 'version':
1055 # Already checked above.
1056 pass
1057
maruel@chromium.org41601642013-09-18 19:40:46 +00001058 else:
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001059 raise ConfigError('Unknown key %r' % key)
maruel@chromium.org41601642013-09-18 19:40:46 +00001060
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001061 # Automatically fix os.path.sep if necessary. While .isolated files are always
1062 # in the the native path format, someone could want to download an .isolated
1063 # tree from another OS.
1064 wrong_path_sep = '/' if os.path.sep == '\\' else '\\'
1065 if 'files' in data:
1066 data['files'] = dict(
1067 (k.replace(wrong_path_sep, os.path.sep), v)
1068 for k, v in data['files'].iteritems())
1069 for v in data['files'].itervalues():
1070 if 'l' in v:
1071 v['l'] = v['l'].replace(wrong_path_sep, os.path.sep)
1072 if 'relative_cwd' in data:
1073 data['relative_cwd'] = data['relative_cwd'].replace(
1074 wrong_path_sep, os.path.sep)
maruel@chromium.org41601642013-09-18 19:40:46 +00001075 return data
1076
1077
1078class IsolatedFile(object):
1079 """Represents a single parsed .isolated file."""
1080 def __init__(self, obj_hash, algo):
1081 """|obj_hash| is really the sha-1 of the file."""
1082 logging.debug('IsolatedFile(%s)' % obj_hash)
1083 self.obj_hash = obj_hash
1084 self.algo = algo
1085 # Set once all the left-side of the tree is parsed. 'Tree' here means the
1086 # .isolate and all the .isolated files recursively included by it with
1087 # 'includes' key. The order of each sha-1 in 'includes', each representing a
1088 # .isolated file in the hash table, is important, as the later ones are not
1089 # processed until the firsts are retrieved and read.
1090 self.can_fetch = False
1091
1092 # Raw data.
1093 self.data = {}
1094 # A IsolatedFile instance, one per object in self.includes.
1095 self.children = []
1096
1097 # Set once the .isolated file is loaded.
1098 self._is_parsed = False
1099 # Set once the files are fetched.
1100 self.files_fetched = False
1101
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001102 def load(self, os_flavor, content):
maruel@chromium.org41601642013-09-18 19:40:46 +00001103 """Verifies the .isolated file is valid and loads this object with the json
1104 data.
1105 """
1106 logging.debug('IsolatedFile.load(%s)' % self.obj_hash)
1107 assert not self._is_parsed
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001108 self.data = load_isolated(content, os_flavor, self.algo)
maruel@chromium.org41601642013-09-18 19:40:46 +00001109 self.children = [
1110 IsolatedFile(i, self.algo) for i in self.data.get('includes', [])
1111 ]
1112 self._is_parsed = True
1113
1114 def fetch_files(self, cache, files):
1115 """Adds files in this .isolated file not present in |files| dictionary.
1116
1117 Preemptively request files.
1118
1119 Note that |files| is modified by this function.
1120 """
1121 assert self.can_fetch
1122 if not self._is_parsed or self.files_fetched:
1123 return
1124 logging.debug('fetch_files(%s)' % self.obj_hash)
1125 for filepath, properties in self.data.get('files', {}).iteritems():
1126 # Root isolated has priority on the files being mapped. In particular,
1127 # overriden files must not be fetched.
1128 if filepath not in files:
1129 files[filepath] = properties
1130 if 'h' in properties:
1131 # Preemptively request files.
1132 logging.debug('fetching %s' % filepath)
1133 cache.retrieve(
1134 WorkerPool.MED,
1135 properties['h'],
1136 properties['s'])
1137 self.files_fetched = True
1138
1139
1140class Settings(object):
1141 """Results of a completely parsed .isolated file."""
1142 def __init__(self):
1143 self.command = []
1144 self.files = {}
1145 self.read_only = None
1146 self.relative_cwd = None
1147 # The main .isolated file, a IsolatedFile instance.
1148 self.root = None
1149
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001150 def load(self, cache, root_isolated_hash, os_flavor, algo):
maruel@chromium.org41601642013-09-18 19:40:46 +00001151 """Loads the .isolated and all the included .isolated asynchronously.
1152
1153 It enables support for "included" .isolated files. They are processed in
1154 strict order but fetched asynchronously from the cache. This is important so
1155 that a file in an included .isolated file that is overridden by an embedding
1156 .isolated file is not fetched needlessly. The includes are fetched in one
1157 pass and the files are fetched as soon as all the ones on the left-side
1158 of the tree were fetched.
1159
1160 The prioritization is very important here for nested .isolated files.
1161 'includes' have the highest priority and the algorithm is optimized for both
1162 deep and wide trees. A deep one is a long link of .isolated files referenced
1163 one at a time by one item in 'includes'. A wide one has a large number of
1164 'includes' in a single .isolated file. 'left' is defined as an included
1165 .isolated file earlier in the 'includes' list. So the order of the elements
1166 in 'includes' is important.
1167 """
1168 self.root = IsolatedFile(root_isolated_hash, algo)
1169
1170 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1171 pending = {}
1172 # Set of hashes of already retrieved items to refuse recursive includes.
1173 seen = set()
1174
1175 def retrieve(isolated_file):
1176 h = isolated_file.obj_hash
1177 if h in seen:
1178 raise ConfigError('IsolatedFile %s is retrieved recursively' % h)
1179 assert h not in pending
1180 seen.add(h)
1181 pending[h] = isolated_file
1182 cache.retrieve(WorkerPool.HIGH, h, UNKNOWN_FILE_SIZE)
1183
1184 retrieve(self.root)
1185
1186 while pending:
1187 item_hash = cache.wait_for(pending)
1188 item = pending.pop(item_hash)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001189 item.load(os_flavor, cache.read(item_hash))
maruel@chromium.org41601642013-09-18 19:40:46 +00001190 if item_hash == root_isolated_hash:
1191 # It's the root item.
1192 item.can_fetch = True
1193
1194 for new_child in item.children:
1195 retrieve(new_child)
1196
1197 # Traverse the whole tree to see if files can now be fetched.
1198 self._traverse_tree(cache, self.root)
1199
1200 def check(n):
1201 return all(check(x) for x in n.children) and n.files_fetched
1202 assert check(self.root)
1203
1204 self.relative_cwd = self.relative_cwd or ''
1205 self.read_only = self.read_only or False
1206
1207 def _traverse_tree(self, cache, node):
1208 if node.can_fetch:
1209 if not node.files_fetched:
1210 self._update_self(cache, node)
1211 will_break = False
1212 for i in node.children:
1213 if not i.can_fetch:
1214 if will_break:
1215 break
1216 # Automatically mark the first one as fetcheable.
1217 i.can_fetch = True
1218 will_break = True
1219 self._traverse_tree(cache, i)
1220
1221 def _update_self(self, cache, node):
1222 node.fetch_files(cache, self.files)
1223 # Grabs properties.
1224 if not self.command and node.data.get('command'):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001225 # Ensure paths are correctly separated on windows.
maruel@chromium.org41601642013-09-18 19:40:46 +00001226 self.command = node.data['command']
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001227 if self.command:
1228 self.command[0] = self.command[0].replace('/', os.path.sep)
1229 self.command = tools.fix_python_path(self.command)
maruel@chromium.org41601642013-09-18 19:40:46 +00001230 if self.read_only is None and node.data.get('read_only') is not None:
1231 self.read_only = node.data['read_only']
1232 if (self.relative_cwd is None and
1233 node.data.get('relative_cwd') is not None):
1234 self.relative_cwd = node.data['relative_cwd']
1235
1236
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001237def fetch_isolated(
1238 isolated_hash, cache, outdir, os_flavor, algo, require_command):
1239 """Aggressively downloads the .isolated file(s), then download all the files.
1240 """
1241 settings = Settings()
1242 with WorkerPool() as pool:
1243 with cache:
1244 cache.set_pool(pool)
1245 with tools.Profiler('GetIsolateds'):
1246 # Optionally support local files.
1247 if not is_valid_hash(isolated_hash, algo):
1248 # Adds it in the cache. While not strictly necessary, this
1249 # simplifies the rest.
1250 h = hash_file(isolated_hash, algo)
1251 cache.add(isolated_hash, h)
1252 isolated_hash = h
1253 settings.load(cache, isolated_hash, os_flavor, algo)
1254
1255 if require_command and not settings.command:
1256 raise ConfigError('No command to run')
1257
1258 with tools.Profiler('GetRest'):
1259 create_directories(outdir, settings.files)
1260 create_links(outdir, settings.files.iteritems())
1261 remaining = generate_remaining_files(settings.files.iteritems())
1262
1263 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1264 if not os.path.isdir(cwd):
1265 os.makedirs(cwd)
1266
1267 # Now block on the remaining files to be downloaded and mapped.
1268 logging.info('Retrieving remaining files')
1269 last_update = time.time()
1270 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1271 while remaining:
1272 detector.ping()
1273 obj = cache.wait_for(remaining)
1274 for filepath, properties in remaining.pop(obj):
1275 outfile = os.path.join(outdir, filepath)
1276 cache.store_to(obj, outfile)
1277 if 'm' in properties:
1278 # It's not set on Windows.
1279 os.chmod(outfile, properties['m'])
1280
1281 duration = time.time() - last_update
1282 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1283 msg = '%d files remaining...' % len(remaining)
1284 print msg
1285 logging.info(msg)
1286 last_update = time.time()
1287 return settings
1288
1289
1290def download_isolated_tree(isolated_hash, target_directory, remote):
1291 """Downloads the dependencies to the given directory."""
1292 if not os.path.exists(target_directory):
1293 os.makedirs(target_directory)
1294
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001295 cache = MemoryCache(remote)
1296 return fetch_isolated(
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001297 isolated_hash, cache, target_directory, None, remote.algo, False)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001298
1299
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001300@subcommand.usage('<file1..fileN> or - to read from stdin')
1301def CMDarchive(parser, args):
1302 """Archives data to the server."""
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001303 options, files = parser.parse_args(args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001304
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001305 if files == ['-']:
1306 files = sys.stdin.readlines()
1307
1308 if not files:
1309 parser.error('Nothing to upload')
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001310
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001311 # Load the necessary metadata.
1312 # TODO(maruel): Use a worker pool to upload as the hashing is being done.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001313 infiles = dict(
1314 (
1315 f,
1316 {
maruel@chromium.orge5c17132012-11-21 18:18:46 +00001317 's': os.stat(f).st_size,
maruel@chromium.org385d73d2013-09-19 18:33:21 +00001318 'h': hash_file(f, get_hash_algo(options.namespace)),
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001319 }
1320 )
1321 for f in files)
1322
vadimsh@chromium.orga4326472013-08-24 02:05:41 +00001323 with tools.Profiler('Archive'):
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001324 ret = upload_tree(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001325 base_url=options.isolate_server,
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001326 indir=os.getcwd(),
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001327 infiles=infiles,
1328 namespace=options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001329 if not ret:
1330 print '\n'.join('%s %s' % (infiles[f]['h'], f) for f in sorted(infiles))
1331 return ret
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001332
1333
1334def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001335 """Download data from the server.
1336
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001337 It can either download individual files or a complete tree from a .isolated
1338 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001339 """
1340 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001341 '-i', '--isolated', metavar='HASH',
1342 help='hash of an isolated file, .isolated file content is discarded, use '
1343 '--file if you need it')
1344 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001345 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1346 help='hash and destination of a file, can be used multiple times')
1347 parser.add_option(
1348 '-t', '--target', metavar='DIR', default=os.getcwd(),
1349 help='destination directory')
1350 options, args = parser.parse_args(args)
1351 if args:
1352 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001353 if bool(options.isolated) == bool(options.file):
1354 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001355
1356 options.target = os.path.abspath(options.target)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +00001357 remote = get_storage_api(options.isolate_server, options.namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001358 for h, dest in options.file:
1359 logging.info('%s: %s', h, dest)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +00001360 file_write(
1361 os.path.join(options.target, dest),
1362 remote.fetch(h, UNKNOWN_FILE_SIZE))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001363 if options.isolated:
1364 settings = download_isolated_tree(options.isolated, options.target, remote)
1365 rel = os.path.join(options.target, settings.relative_cwd)
1366 print('To run this test please run from the directory %s:' %
1367 os.path.join(options.target, rel))
1368 print(' ' + ' '.join(settings.command))
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001369 return 0
1370
1371
1372class OptionParserIsolateServer(tools.OptionParserWithLogging):
1373 def __init__(self, **kwargs):
1374 tools.OptionParserWithLogging.__init__(self, **kwargs)
1375 self.add_option(
1376 '-I', '--isolate-server',
maruel@chromium.orge9403ab2013-09-20 18:03:49 +00001377 metavar='URL', default='',
1378 help='Isolate server to use')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001379 self.add_option(
1380 '--namespace', default='default-gzip',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001381 help='The namespace to use on the server, default: %default')
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001382
1383 def parse_args(self, *args, **kwargs):
1384 options, args = tools.OptionParserWithLogging.parse_args(
1385 self, *args, **kwargs)
1386 options.isolate_server = options.isolate_server.rstrip('/')
1387 if not options.isolate_server:
1388 self.error('--isolate-server is required.')
1389 return options, args
1390
1391
1392def main(args):
1393 dispatcher = subcommand.CommandDispatcher(__name__)
1394 try:
1395 return dispatcher.execute(
1396 OptionParserIsolateServer(version=__version__), args)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001397 except (ConfigError, MappingError) as e:
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001398 sys.stderr.write('\nError: ')
1399 sys.stderr.write(str(e))
1400 sys.stderr.write('\n')
1401 return 1
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001402
1403
1404if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001405 fix_encoding.fix_encoding()
1406 tools.disable_buffering()
1407 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001408 sys.exit(main(sys.argv[1:]))