blob: 04c312e7ff5dd9ca7cfb971d238450f1c61cca7f [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000013import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050014import shutil
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050020import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000021import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000023from third_party import colorama
24from third_party.depot_tools import fix_encoding
25from third_party.depot_tools import subcommand
26
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050027from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000028from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040029from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000030from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000031from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000032
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080033import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040034import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000037# Version of isolate protocol passed to the server in /handshake request.
38ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040
Vadim Shtayura3148e072014-09-02 18:51:52 -070041# The file size to be used when we don't know the correct file size,
42# generally used for .isolated files.
43UNKNOWN_FILE_SIZE = None
44
45
46# Maximum expected delay (in seconds) between successive file fetches or uploads
47# in Storage. If it takes longer than that, a deadlock might be happening
48# and all stack frames for all threads are dumped to log.
49DEADLOCK_TIMEOUT = 5 * 60
50
51
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000052# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000053# All files are sorted by likelihood of a change in the file content
54# (currently file size is used to estimate this: larger the file -> larger the
55# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000056# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000057# and so on. Numbers here is a trade-off; the more per request, the lower the
58# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
59# larger values cause longer lookups, increasing the initial latency to start
60# uploading, which is especially an issue for large files. This value is
61# optimized for the "few thousands files to look up with minimal number of large
62# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040063ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000064
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000065
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000066# A list of already compressed extension types that should not receive any
67# compression before being uploaded.
68ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040069 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
70 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000071]
72
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000073
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000074# Chunk size to use when reading from network stream.
75NET_IO_FILE_CHUNK = 16 * 1024
76
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000077
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000078# Read timeout in seconds for downloads from isolate storage. If there's no
79# response from the server within this timeout whole download will be aborted.
80DOWNLOAD_READ_TIMEOUT = 60
81
82
maruel@chromium.org41601642013-09-18 19:40:46 +000083# The delay (in seconds) to wait between logging statements when retrieving
84# the required files. This is intended to let the user (or buildbot) know that
85# the program is still running.
86DELAY_BETWEEN_UPDATES_IN_SECS = 30
87
88
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050089DEFAULT_BLACKLIST = (
90 # Temporary vim or python files.
91 r'^.+\.(?:pyc|swp)$',
92 # .git or .svn directory.
93 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
94)
95
96
97# Chromium-specific.
98DEFAULT_BLACKLIST += (
99 r'^.+\.(?:run_test_cases)$',
100 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
101)
102
103
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500104class Error(Exception):
105 """Generic runtime error."""
106 pass
107
108
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000109def stream_read(stream, chunk_size):
110 """Reads chunks from |stream| and yields them."""
111 while True:
112 data = stream.read(chunk_size)
113 if not data:
114 break
115 yield data
116
117
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400118def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800119 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000120 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800121 if offset:
122 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000123 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000124 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 if not data:
126 break
127 yield data
128
129
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000130def file_write(filepath, content_generator):
131 """Writes file content as generated by content_generator.
132
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 Creates the intermediary directory as needed.
134
135 Returns the number of bytes written.
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 Meant to be mocked out in unit tests.
138 """
139 filedir = os.path.dirname(filepath)
140 if not os.path.isdir(filedir):
141 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 with open(filepath, 'wb') as f:
144 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000145 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148
149
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000150def zip_compress(content_generator, level=7):
151 """Reads chunks from |content_generator| and yields zip compressed chunks."""
152 compressor = zlib.compressobj(level)
153 for chunk in content_generator:
154 compressed = compressor.compress(chunk)
155 if compressed:
156 yield compressed
157 tail = compressor.flush(zlib.Z_FINISH)
158 if tail:
159 yield tail
160
161
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400162def zip_decompress(
163 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000164 """Reads zipped data from |content_generator| and yields decompressed data.
165
166 Decompresses data in small chunks (no larger than |chunk_size|) so that
167 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
168
169 Raises IOError if data is corrupted or incomplete.
170 """
171 decompressor = zlib.decompressobj()
172 compressed_size = 0
173 try:
174 for chunk in content_generator:
175 compressed_size += len(chunk)
176 data = decompressor.decompress(chunk, chunk_size)
177 if data:
178 yield data
179 while decompressor.unconsumed_tail:
180 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
181 if data:
182 yield data
183 tail = decompressor.flush()
184 if tail:
185 yield tail
186 except zlib.error as e:
187 raise IOError(
188 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
189 # Ensure all data was read and decompressed.
190 if decompressor.unused_data or decompressor.unconsumed_tail:
191 raise IOError('Not all data was decompressed')
192
193
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000194def get_zip_compression_level(filename):
195 """Given a filename calculates the ideal zip compression level to use."""
196 file_ext = os.path.splitext(filename)[1].lower()
197 # TODO(csharp): Profile to find what compression level works best.
198 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
199
200
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000201def create_directories(base_directory, files):
202 """Creates the directory structure needed by the given list of files."""
203 logging.debug('create_directories(%s, %d)', base_directory, len(files))
204 # Creates the tree of directories to create.
205 directories = set(os.path.dirname(f) for f in files)
206 for item in list(directories):
207 while item:
208 directories.add(item)
209 item = os.path.dirname(item)
210 for d in sorted(directories):
211 if d:
212 os.mkdir(os.path.join(base_directory, d))
213
214
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500215def create_symlinks(base_directory, files):
216 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000217 for filepath, properties in files:
218 if 'l' not in properties:
219 continue
220 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222 logging.warning('Ignoring symlink %s', filepath)
223 continue
224 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500225 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000226 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000227
228
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000229def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000230 """Determines if the given files appears valid.
231
232 Currently it just checks the file's size.
233 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700234 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000235 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 actual_size = os.stat(filepath).st_size
237 if size != actual_size:
238 logging.warning(
239 'Found invalid item %s; %d != %d',
240 os.path.basename(filepath), actual_size, size)
241 return False
242 return True
243
244
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000245class Item(object):
246 """An item to push to Storage.
247
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800248 Its digest and size may be provided in advance, if known. Otherwise they will
249 be derived from content(). If digest is provided, it MUST correspond to
250 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800252 When used with Storage, Item starts its life in a main thread, travels
253 to 'contains' thread, then to 'push' thread and then finally back to
254 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000255 """
256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000258 self.digest = digest
259 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800260 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def content(self):
264 """Iterable with content of this item as byte string (str) chunks."""
265 raise NotImplementedError()
266
267 def prepare(self, hash_algo):
268 """Ensures self.digest and self.size are set.
269
270 Uses content() as a source of data to calculate them. Does nothing if digest
271 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000272
273 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800274 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000275 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800276 if self.digest is None or self.size is None:
277 digest = hash_algo()
278 total = 0
279 for chunk in self.content():
280 digest.update(chunk)
281 total += len(chunk)
282 self.digest = digest.hexdigest()
283 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000284
285
286class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800287 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000288
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800289 Its digest and size may be provided in advance, if known. Otherwise they will
290 be derived from the file content.
291 """
292
293 def __init__(self, path, digest=None, size=None, high_priority=False):
294 super(FileItem, self).__init__(
295 digest,
296 size if size is not None else os.stat(path).st_size,
297 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000298 self.path = path
299 self.compression_level = get_zip_compression_level(path)
300
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800301 def content(self):
302 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000303
304
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000305class BufferItem(Item):
306 """A byte buffer to push to Storage."""
307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 def __init__(self, buf, high_priority=False):
309 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000310 self.buffer = buf
311
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800312 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000313 return [self.buffer]
314
315
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000316class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 Implements compression support, parallel 'contains' checks, parallel uploads
320 and more.
321
322 Works only within single namespace (and thus hashing algorithm and compression
323 scheme are fixed).
324
325 Spawns multiple internal threads. Thread safe, but not fork safe.
326 """
327
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700328 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000329 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400330 self._use_zip = isolated_format.is_namespace_with_compression(
331 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400332 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000333 self._cpu_thread_pool = None
334 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000335
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700337 def hash_algo(self):
338 """Hashing algorithm used to name files in storage based on their content.
339
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400340 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700341 """
342 return self._hash_algo
343
344 @property
345 def location(self):
346 """Location of a backing store that this class is using.
347
348 Exact meaning depends on the storage_api type. For IsolateServer it is
349 an URL of isolate server, for FileSystem is it a path in file system.
350 """
351 return self._storage_api.location
352
353 @property
354 def namespace(self):
355 """Isolate namespace used by this storage.
356
357 Indirectly defines hashing scheme and compression method used.
358 """
359 return self._storage_api.namespace
360
361 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000362 def cpu_thread_pool(self):
363 """ThreadPool for CPU-bound tasks like zipping."""
364 if self._cpu_thread_pool is None:
365 self._cpu_thread_pool = threading_utils.ThreadPool(
366 2, max(threading_utils.num_processors(), 2), 0, 'zip')
367 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000368
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000369 @property
370 def net_thread_pool(self):
371 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
372 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700373 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000374 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000375
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000376 def close(self):
377 """Waits for all pending tasks to finish."""
378 if self._cpu_thread_pool:
379 self._cpu_thread_pool.join()
380 self._cpu_thread_pool.close()
381 self._cpu_thread_pool = None
382 if self._net_thread_pool:
383 self._net_thread_pool.join()
384 self._net_thread_pool.close()
385 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000386
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 def __enter__(self):
388 """Context manager interface."""
389 return self
390
391 def __exit__(self, _exc_type, _exc_value, _traceback):
392 """Context manager interface."""
393 self.close()
394 return False
395
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000396 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000398
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800399 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000400
401 Arguments:
402 items: list of Item instances that represents data to upload.
403
404 Returns:
405 List of items that were uploaded. All other items are already there.
406 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700407 logging.info('upload_items(items=%d)', len(items))
408
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000409 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
410 # used by swarming.py. There's no need to spawn multiple threads and try to
411 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
412 # 'push' should be performed sequentially in the context of current thread.
413
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800414 # Ensure all digests are calculated.
415 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700416 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800417
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000418 # For each digest keep only first Item that matches it. All other items
419 # are just indistinguishable copies from the point of view of isolate
420 # server (it doesn't care about paths at all, only content and digests).
421 seen = {}
422 duplicates = 0
423 for item in items:
424 if seen.setdefault(item.digest, item) is not item:
425 duplicates += 1
426 items = seen.values()
427 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700428 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000429
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000430 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000431 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000432 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800433 channel = threading_utils.TaskChannel()
434 for missing_item, push_state in self.get_missing_items(items):
435 missing.add(missing_item)
436 self.async_push(channel, missing_item, push_state)
437
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000438 # No need to spawn deadlock detector thread if there's nothing to upload.
439 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700440 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000441 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000442 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000443 detector.ping()
444 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000445 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000446 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000447 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000448 logging.info('All files are uploaded')
449
450 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000451 total = len(items)
452 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 logging.info(
454 'Total: %6d, %9.1fkb',
455 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000456 total_size / 1024.)
457 cache_hit = set(items) - missing
458 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 logging.info(
460 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
461 len(cache_hit),
462 cache_hit_size / 1024.,
463 len(cache_hit) * 100. / total,
464 cache_hit_size * 100. / total_size if total_size else 0)
465 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000466 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000467 logging.info(
468 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
469 len(cache_miss),
470 cache_miss_size / 1024.,
471 len(cache_miss) * 100. / total,
472 cache_miss_size * 100. / total_size if total_size else 0)
473
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000474 return uploaded
475
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800476 def get_fetch_url(self, item):
477 """Returns an URL that can be used to fetch given item once it's uploaded.
478
479 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000480
481 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800482 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000483
484 Returns:
485 An URL or None if underlying protocol doesn't support this.
486 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700487 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800488 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000489
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800490 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000491 """Starts asynchronous push to the server in a parallel thread.
492
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800493 Can be used only after |item| was checked for presence on a server with
494 'get_missing_items' call. 'get_missing_items' returns |push_state| object
495 that contains storage specific information describing how to upload
496 the item (for example in case of cloud storage, it is signed upload URLs).
497
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000498 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000499 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000500 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800501 push_state: push state returned by 'get_missing_items' call for |item|.
502
503 Returns:
504 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000505 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800506 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400507 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700508 threading_utils.PRIORITY_HIGH if item.high_priority
509 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000511 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400512 """Pushes an Item and returns it to |channel|."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700513 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000515 return item
516
517 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700518 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800519 self.net_thread_pool.add_task_with_channel(
520 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000521 return
522
523 # If zipping is enabled, zip in a separate thread.
524 def zip_and_push():
525 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
526 # content right here. It will block until all file is zipped.
527 try:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000529 data = ''.join(stream)
530 except Exception as exc:
531 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800532 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000534 self.net_thread_pool.add_task_with_channel(
535 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000536 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000537
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800538 def push(self, item, push_state):
539 """Synchronously pushes a single item to the server.
540
541 If you need to push many items at once, consider using 'upload_items' or
542 'async_push' with instance of TaskChannel.
543
544 Arguments:
545 item: item to upload as instance of Item class.
546 push_state: push state returned by 'get_missing_items' call for |item|.
547
548 Returns:
549 Pushed item (same object as |item|).
550 """
551 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700552 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800553 self.async_push(channel, item, push_state)
554 pushed = channel.pull()
555 assert pushed is item
556 return item
557
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000558 def async_fetch(self, channel, priority, digest, size, sink):
559 """Starts asynchronous fetch from the server in a parallel thread.
560
561 Arguments:
562 channel: TaskChannel that receives back |digest| when download ends.
563 priority: thread pool task priority for the fetch.
564 digest: hex digest of an item to download.
565 size: expected size of the item (after decompression).
566 sink: function that will be called as sink(generator).
567 """
568 def fetch():
569 try:
570 # Prepare reading pipeline.
571 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700572 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400573 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000574 # Run |stream| through verifier that will assert its size.
575 verifier = FetchStreamVerifier(stream, size)
576 # Verified stream goes to |sink|.
577 sink(verifier.run())
578 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800579 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000580 raise
581 return digest
582
583 # Don't bother with zip_thread_pool for decompression. Decompression is
584 # really fast and most probably IO bound anyway.
585 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
586
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 def get_missing_items(self, items):
588 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000589
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000590 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000591
592 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000593 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594
595 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800596 For each missing item it yields a pair (item, push_state), where:
597 * item - Item object that is missing (one of |items|).
598 * push_state - opaque object that contains storage specific information
599 describing how to upload the item (for example in case of cloud
600 storage, it is signed upload URLs). It can later be passed to
601 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000602 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000603 channel = threading_utils.TaskChannel()
604 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800605
606 # Ensure all digests are calculated.
607 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700608 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800609
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000610 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800611 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400612 self.net_thread_pool.add_task_with_channel(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700613 channel, threading_utils.PRIORITY_HIGH,
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000614 self._storage_api.contains, batch)
615 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000617 # Yield results as they come in.
618 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800619 for missing_item, push_state in channel.pull().iteritems():
620 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000621
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000622
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800623def batch_items_for_check(items):
624 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000625
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800626 Each batch corresponds to a single 'exists?' query to the server via a call
627 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800629 Arguments:
630 items: a list of Item objects.
631
632 Yields:
633 Batches of items to query for existence in a single operation,
634 each batch is a list of Item objects.
635 """
636 batch_count = 0
637 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
638 next_queries = []
639 for item in sorted(items, key=lambda x: x.size, reverse=True):
640 next_queries.append(item)
641 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000642 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800643 next_queries = []
644 batch_count += 1
645 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
646 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
647 if next_queries:
648 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000649
650
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000651class FetchQueue(object):
652 """Fetches items from Storage and places them into LocalCache.
653
654 It manages multiple concurrent fetch operations. Acts as a bridge between
655 Storage and LocalCache so that Storage and LocalCache don't depend on each
656 other at all.
657 """
658
659 def __init__(self, storage, cache):
660 self.storage = storage
661 self.cache = cache
662 self._channel = threading_utils.TaskChannel()
663 self._pending = set()
664 self._accessed = set()
665 self._fetched = cache.cached_set()
666
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400667 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700668 self,
669 digest,
670 size=UNKNOWN_FILE_SIZE,
671 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000672 """Starts asynchronous fetch of item |digest|."""
673 # Fetching it now?
674 if digest in self._pending:
675 return
676
677 # Mark this file as in use, verify_all_cached will later ensure it is still
678 # in cache.
679 self._accessed.add(digest)
680
681 # Already fetched? Notify cache to update item's LRU position.
682 if digest in self._fetched:
683 # 'touch' returns True if item is in cache and not corrupted.
684 if self.cache.touch(digest, size):
685 return
686 # Item is corrupted, remove it from cache and fetch it again.
687 self._fetched.remove(digest)
688 self.cache.evict(digest)
689
690 # TODO(maruel): It should look at the free disk space, the current cache
691 # size and the size of the new item on every new item:
692 # - Trim the cache as more entries are listed when free disk space is low,
693 # otherwise if the amount of data downloaded during the run > free disk
694 # space, it'll crash.
695 # - Make sure there's enough free disk space to fit all dependencies of
696 # this run! If not, abort early.
697
698 # Start fetching.
699 self._pending.add(digest)
700 self.storage.async_fetch(
701 self._channel, priority, digest, size,
702 functools.partial(self.cache.write, digest))
703
704 def wait(self, digests):
705 """Starts a loop that waits for at least one of |digests| to be retrieved.
706
707 Returns the first digest retrieved.
708 """
709 # Flush any already fetched items.
710 for digest in digests:
711 if digest in self._fetched:
712 return digest
713
714 # Ensure all requested items are being fetched now.
715 assert all(digest in self._pending for digest in digests), (
716 digests, self._pending)
717
718 # Wait for some requested item to finish fetching.
719 while self._pending:
720 digest = self._channel.pull()
721 self._pending.remove(digest)
722 self._fetched.add(digest)
723 if digest in digests:
724 return digest
725
726 # Should never reach this point due to assert above.
727 raise RuntimeError('Impossible state')
728
729 def inject_local_file(self, path, algo):
730 """Adds local file to the cache as if it was fetched from storage."""
731 with open(path, 'rb') as f:
732 data = f.read()
733 digest = algo(data).hexdigest()
734 self.cache.write(digest, [data])
735 self._fetched.add(digest)
736 return digest
737
738 @property
739 def pending_count(self):
740 """Returns number of items to be fetched."""
741 return len(self._pending)
742
743 def verify_all_cached(self):
744 """True if all accessed items are in cache."""
745 return self._accessed.issubset(self.cache.cached_set())
746
747
748class FetchStreamVerifier(object):
749 """Verifies that fetched file is valid before passing it to the LocalCache."""
750
751 def __init__(self, stream, expected_size):
752 self.stream = stream
753 self.expected_size = expected_size
754 self.current_size = 0
755
756 def run(self):
757 """Generator that yields same items as |stream|.
758
759 Verifies |stream| is complete before yielding a last chunk to consumer.
760
761 Also wraps IOError produced by consumer into MappingError exceptions since
762 otherwise Storage will retry fetch on unrelated local cache errors.
763 """
764 # Read one chunk ahead, keep it in |stored|.
765 # That way a complete stream can be verified before pushing last chunk
766 # to consumer.
767 stored = None
768 for chunk in self.stream:
769 assert chunk is not None
770 if stored is not None:
771 self._inspect_chunk(stored, is_last=False)
772 try:
773 yield stored
774 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400775 raise isolated_format.MappingError(
776 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000777 stored = chunk
778 if stored is not None:
779 self._inspect_chunk(stored, is_last=True)
780 try:
781 yield stored
782 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400783 raise isolated_format.MappingError(
784 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000785
786 def _inspect_chunk(self, chunk, is_last):
787 """Called for each fetched chunk before passing it to consumer."""
788 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400789 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700790 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000791 (self.expected_size != self.current_size)):
792 raise IOError('Incorrect file size: expected %d, got %d' % (
793 self.expected_size, self.current_size))
794
795
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000796class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800797 """Interface for classes that implement low-level storage operations.
798
799 StorageApi is oblivious of compression and hashing scheme used. This details
800 are handled in higher level Storage class.
801
802 Clients should generally not use StorageApi directly. Storage class is
803 preferred since it implements compression and upload optimizations.
804 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000805
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700806 @property
807 def location(self):
808 """Location of a backing store that this class is using.
809
810 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
811 server, for FileSystem is it a path in file system.
812 """
813 raise NotImplementedError()
814
815 @property
816 def namespace(self):
817 """Isolate namespace used by this storage.
818
819 Indirectly defines hashing scheme and compression method used.
820 """
821 raise NotImplementedError()
822
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000823 def get_fetch_url(self, digest):
824 """Returns an URL that can be used to fetch an item with given digest.
825
826 Arguments:
827 digest: hex digest of item to fetch.
828
829 Returns:
830 An URL or None if the protocol doesn't support this.
831 """
832 raise NotImplementedError()
833
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800834 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000835 """Fetches an object and yields its content.
836
837 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000838 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800839 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000840
841 Yields:
842 Chunks of downloaded item (as str objects).
843 """
844 raise NotImplementedError()
845
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800846 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000847 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000848
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800849 |item| MUST go through 'contains' call to get |push_state| before it can
850 be pushed to the storage.
851
852 To be clear, here is one possible usage:
853 all_items = [... all items to push as Item subclasses ...]
854 for missing_item, push_state in storage_api.contains(all_items).items():
855 storage_api.push(missing_item, push_state)
856
857 When pushing to a namespace with compression, data that should be pushed
858 and data provided by the item is not the same. In that case |content| is
859 not None and it yields chunks of compressed data (using item.content() as
860 a source of original uncompressed data). This is implemented by Storage
861 class.
862
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000863 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000864 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800865 push_state: push state object as returned by 'contains' call.
866 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000867
868 Returns:
869 None.
870 """
871 raise NotImplementedError()
872
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000873 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800874 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000875
876 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800877 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000878
879 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800880 A dict missing Item -> opaque push state object to be passed to 'push'.
881 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000882 """
883 raise NotImplementedError()
884
885
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800886class _IsolateServerPushState(object):
887 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500888
889 Note this needs to be a global class to support pickling.
890 """
891
892 def __init__(self, upload_url, finalize_url):
893 self.upload_url = upload_url
894 self.finalize_url = finalize_url
895 self.uploaded = False
896 self.finalized = False
897
898
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000899class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000900 """StorageApi implementation that downloads and uploads to Isolate Server.
901
902 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800903 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000904 """
905
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000906 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000907 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000908 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700909 self._base_url = base_url.rstrip('/')
910 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000911 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000912 self._server_caps = None
913
914 @staticmethod
915 def _generate_handshake_request():
916 """Returns a dict to be sent as handshake request body."""
917 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
918 return {
919 'client_app_version': __version__,
920 'fetcher': True,
921 'protocol_version': ISOLATE_PROTOCOL_VERSION,
922 'pusher': True,
923 }
924
925 @staticmethod
926 def _validate_handshake_response(caps):
927 """Validates and normalizes handshake response."""
928 logging.info('Protocol version: %s', caps['protocol_version'])
929 logging.info('Server version: %s', caps['server_app_version'])
930 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400931 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000932 if not caps['access_token']:
933 raise ValueError('access_token is missing')
934 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000935
936 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000937 def _server_capabilities(self):
938 """Performs handshake with the server if not yet done.
939
940 Returns:
941 Server capabilities dictionary as returned by /handshake endpoint.
942
943 Raises:
944 MappingError if server rejects the handshake.
945 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000946 # TODO(maruel): Make this request much earlier asynchronously while the
947 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800948
949 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
950 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000951 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000952 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000953 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400954 caps = net.url_read_json(
955 url=self._base_url + '/content-gs/handshake',
956 data=self._generate_handshake_request())
957 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400958 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000959 if not isinstance(caps, dict):
960 raise ValueError('Expecting JSON dict')
961 self._server_caps = self._validate_handshake_response(caps)
962 except (ValueError, KeyError, TypeError) as exc:
963 # KeyError exception has very confusing str conversion: it's just a
964 # missing key value and nothing else. So print exception class name
965 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400966 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400967 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000968 exc.__class__.__name__, exc))
969 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000970
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700971 @property
972 def location(self):
973 return self._base_url
974
975 @property
976 def namespace(self):
977 return self._namespace
978
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000979 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000980 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000981 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700982 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000983
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800984 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000985 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800986 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000987
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000988 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800989 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800990 read_timeout=DOWNLOAD_READ_TIMEOUT,
991 headers={'Range': 'bytes=%d-' % offset} if offset else None)
992
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000993 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800994 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800995
996 # If |offset| is used, verify server respects it by checking Content-Range.
997 if offset:
998 content_range = connection.get_header('Content-Range')
999 if not content_range:
1000 raise IOError('Missing Content-Range header')
1001
1002 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1003 # According to a spec, <size> can be '*' meaning "Total size of the file
1004 # is not known in advance".
1005 try:
1006 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1007 if not match:
1008 raise ValueError()
1009 content_offset = int(match.group(1))
1010 last_byte_index = int(match.group(2))
1011 size = None if match.group(3) == '*' else int(match.group(3))
1012 except ValueError:
1013 raise IOError('Invalid Content-Range header: %s' % content_range)
1014
1015 # Ensure returned offset equals requested one.
1016 if offset != content_offset:
1017 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1018 offset, content_offset, content_range))
1019
1020 # Ensure entire tail of the file is returned.
1021 if size is not None and last_byte_index + 1 != size:
1022 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1023
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001024 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001025
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001026 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001027 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001028 assert item.digest is not None
1029 assert item.size is not None
1030 assert isinstance(push_state, _IsolateServerPushState)
1031 assert not push_state.finalized
1032
1033 # Default to item.content().
1034 content = item.content() if content is None else content
1035
1036 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1037 if isinstance(content, basestring):
1038 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1039 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001040
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001041 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1042 # If |content| is indeed a generator, it can not be re-winded back
1043 # to the beginning of the stream. A retry will find it exhausted. A possible
1044 # solution is to wrap |content| generator with some sort of caching
1045 # restartable generator. It should be done alongside streaming support
1046 # implementation.
1047
1048 # This push operation may be a retry after failed finalization call below,
1049 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001050 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001051 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1052 # upload support is implemented.
1053 if isinstance(content, list) and len(content) == 1:
1054 content = content[0]
1055 else:
1056 content = ''.join(content)
1057 # PUT file to |upload_url|.
1058 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001059 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001060 data=content,
1061 content_type='application/octet-stream',
1062 method='PUT')
1063 if response is None:
1064 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001065 item.digest, push_state.upload_url))
1066 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001067 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001068 logging.info(
1069 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001070
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001071 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001072 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001073 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1074 # send it to isolated server. That way isolate server can verify that
1075 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1076 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001077 # TODO(maruel): Fix the server to accept propery data={} so
1078 # url_read_json() can be used.
1079 response = net.url_read(
1080 url=push_state.finalize_url,
1081 data='',
1082 content_type='application/json',
1083 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001084 if response is None:
1085 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001086 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001087
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001088 def contains(self, items):
1089 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001090
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001091 # Ensure all items were initialized with 'prepare' call. Storage does that.
1092 assert all(i.digest is not None and i.size is not None for i in items)
1093
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001094 # Request body is a json encoded list of dicts.
1095 body = [
1096 {
1097 'h': item.digest,
1098 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001099 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001100 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001101 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001102
1103 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001104 self._base_url,
1105 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001106 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001107
1108 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001109 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001110 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001111 response = net.url_read_json(url=query_url, data=body)
1112 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001113 raise isolated_format.MappingError(
1114 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001115 if not isinstance(response, list):
1116 raise ValueError('Expecting response with json-encoded list')
1117 if len(response) != len(items):
1118 raise ValueError(
1119 'Incorrect number of items in the list, expected %d, '
1120 'but got %d' % (len(items), len(response)))
1121 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001122 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001123 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001124
1125 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001126 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001127 for i, push_urls in enumerate(response):
1128 if push_urls:
1129 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001130 missing_items[items[i]] = _IsolateServerPushState(
1131 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001132 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001133 len(items), len(items) - len(missing_items))
1134 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001135
1136
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001137class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001138 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001139
1140 The common use case is a NFS/CIFS file server that is mounted locally that is
1141 used to fetch the file on a local partition.
1142 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001143
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001144 # Used for push_state instead of None. That way caller is forced to
1145 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1146 _DUMMY_PUSH_STATE = object()
1147
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001148 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001149 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001150 self._base_path = base_path
1151 self._namespace = namespace
1152
1153 @property
1154 def location(self):
1155 return self._base_path
1156
1157 @property
1158 def namespace(self):
1159 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001160
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001161 def get_fetch_url(self, digest):
1162 return None
1163
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001164 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001165 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001166 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001167
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001168 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001169 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001170 assert item.digest is not None
1171 assert item.size is not None
1172 assert push_state is self._DUMMY_PUSH_STATE
1173 content = item.content() if content is None else content
1174 if isinstance(content, basestring):
1175 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1176 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001177 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001178
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001179 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001180 assert all(i.digest is not None and i.size is not None for i in items)
1181 return dict(
1182 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001183 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001184 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001185
1186
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001187class LocalCache(object):
1188 """Local cache that stores objects fetched via Storage.
1189
1190 It can be accessed concurrently from multiple threads, so it should protect
1191 its internal state with some lock.
1192 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001193 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001194
1195 def __enter__(self):
1196 """Context manager interface."""
1197 return self
1198
1199 def __exit__(self, _exc_type, _exec_value, _traceback):
1200 """Context manager interface."""
1201 return False
1202
1203 def cached_set(self):
1204 """Returns a set of all cached digests (always a new object)."""
1205 raise NotImplementedError()
1206
1207 def touch(self, digest, size):
1208 """Ensures item is not corrupted and updates its LRU position.
1209
1210 Arguments:
1211 digest: hash digest of item to check.
1212 size: expected size of this item.
1213
1214 Returns:
1215 True if item is in cache and not corrupted.
1216 """
1217 raise NotImplementedError()
1218
1219 def evict(self, digest):
1220 """Removes item from cache if it's there."""
1221 raise NotImplementedError()
1222
1223 def read(self, digest):
1224 """Returns contents of the cached item as a single str."""
1225 raise NotImplementedError()
1226
1227 def write(self, digest, content):
1228 """Reads data from |content| generator and stores it in cache."""
1229 raise NotImplementedError()
1230
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001231 def hardlink(self, digest, dest, file_mode):
1232 """Ensures file at |dest| has same content as cached |digest|.
1233
1234 If file_mode is provided, it is used to set the executable bit if
1235 applicable.
1236 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001237 raise NotImplementedError()
1238
1239
1240class MemoryCache(LocalCache):
1241 """LocalCache implementation that stores everything in memory."""
1242
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001243 def __init__(self, file_mode_mask=0500):
1244 """Args:
1245 file_mode_mask: bit mask to AND file mode with. Default value will make
1246 all mapped files to be read only.
1247 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001248 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001249 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001250 # Let's not assume dict is thread safe.
1251 self._lock = threading.Lock()
1252 self._contents = {}
1253
1254 def cached_set(self):
1255 with self._lock:
1256 return set(self._contents)
1257
1258 def touch(self, digest, size):
1259 with self._lock:
1260 return digest in self._contents
1261
1262 def evict(self, digest):
1263 with self._lock:
1264 self._contents.pop(digest, None)
1265
1266 def read(self, digest):
1267 with self._lock:
1268 return self._contents[digest]
1269
1270 def write(self, digest, content):
1271 # Assemble whole stream before taking the lock.
1272 data = ''.join(content)
1273 with self._lock:
1274 self._contents[digest] = data
1275
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001276 def hardlink(self, digest, dest, file_mode):
1277 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001278 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001279 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001280 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001281
1282
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001283class IsolatedBundle(object):
1284 """Fetched and parsed .isolated file with all dependencies."""
1285
Vadim Shtayura3148e072014-09-02 18:51:52 -07001286 def __init__(self):
1287 self.command = []
1288 self.files = {}
1289 self.read_only = None
1290 self.relative_cwd = None
1291 # The main .isolated file, a IsolatedFile instance.
1292 self.root = None
1293
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001294 def fetch(self, fetch_queue, root_isolated_hash, algo):
1295 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001296
1297 It enables support for "included" .isolated files. They are processed in
1298 strict order but fetched asynchronously from the cache. This is important so
1299 that a file in an included .isolated file that is overridden by an embedding
1300 .isolated file is not fetched needlessly. The includes are fetched in one
1301 pass and the files are fetched as soon as all the ones on the left-side
1302 of the tree were fetched.
1303
1304 The prioritization is very important here for nested .isolated files.
1305 'includes' have the highest priority and the algorithm is optimized for both
1306 deep and wide trees. A deep one is a long link of .isolated files referenced
1307 one at a time by one item in 'includes'. A wide one has a large number of
1308 'includes' in a single .isolated file. 'left' is defined as an included
1309 .isolated file earlier in the 'includes' list. So the order of the elements
1310 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001311
1312 As a side effect this method starts asynchronous fetch of all data files
1313 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1314 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001315 """
1316 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1317
1318 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1319 pending = {}
1320 # Set of hashes of already retrieved items to refuse recursive includes.
1321 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001322 # Set of IsolatedFile's whose data files have already being fetched.
1323 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001324
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001325 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001326 h = isolated_file.obj_hash
1327 if h in seen:
1328 raise isolated_format.IsolatedError(
1329 'IsolatedFile %s is retrieved recursively' % h)
1330 assert h not in pending
1331 seen.add(h)
1332 pending[h] = isolated_file
1333 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1334
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001335 # Start fetching root *.isolated file (single file, not the whole bundle).
1336 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001337
1338 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001339 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001340 item_hash = fetch_queue.wait(pending)
1341 item = pending.pop(item_hash)
1342 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001343
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001344 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001345 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001346 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001347
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001348 # Always fetch *.isolated files in traversal order, waiting if necessary
1349 # until next to-be-processed node loads. "Waiting" is done by yielding
1350 # back to the outer loop, that waits until some *.isolated is loaded.
1351 for node in isolated_format.walk_includes(self.root):
1352 if node not in processed:
1353 # Not visited, and not yet loaded -> wait for it to load.
1354 if not node.is_loaded:
1355 break
1356 # Not visited and loaded -> process it and continue the traversal.
1357 self._start_fetching_files(node, fetch_queue)
1358 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001359
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001360 # All *.isolated files should be processed by now and only them.
1361 all_isolateds = set(isolated_format.walk_includes(self.root))
1362 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001363
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001364 # Extract 'command' and other bundle properties.
1365 for node in isolated_format.walk_includes(self.root):
1366 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001367 self.relative_cwd = self.relative_cwd or ''
1368
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001369 def _start_fetching_files(self, isolated, fetch_queue):
1370 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001371
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001372 Modifies self.files.
1373 """
1374 logging.debug('fetch_files(%s)', isolated.obj_hash)
1375 for filepath, properties in isolated.data.get('files', {}).iteritems():
1376 # Root isolated has priority on the files being mapped. In particular,
1377 # overridden files must not be fetched.
1378 if filepath not in self.files:
1379 self.files[filepath] = properties
1380 if 'h' in properties:
1381 # Preemptively request files.
1382 logging.debug('fetching %s', filepath)
1383 fetch_queue.add(
1384 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1385
1386 def _update_self(self, node):
1387 """Extracts bundle global parameters from loaded *.isolated file.
1388
1389 Will be called with each loaded *.isolated file in order of traversal of
1390 isolated include graph (see isolated_format.walk_includes).
1391 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001392 # Grabs properties.
1393 if not self.command and node.data.get('command'):
1394 # Ensure paths are correctly separated on windows.
1395 self.command = node.data['command']
1396 if self.command:
1397 self.command[0] = self.command[0].replace('/', os.path.sep)
1398 self.command = tools.fix_python_path(self.command)
1399 if self.read_only is None and node.data.get('read_only') is not None:
1400 self.read_only = node.data['read_only']
1401 if (self.relative_cwd is None and
1402 node.data.get('relative_cwd') is not None):
1403 self.relative_cwd = node.data['relative_cwd']
1404
1405
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001406def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001407 """Returns an object that implements low-level StorageApi interface.
1408
1409 It is used by Storage to work with single isolate |namespace|. It should
1410 rarely be used directly by clients, see 'get_storage' for
1411 a better alternative.
1412
1413 Arguments:
1414 file_or_url: a file path to use file system based storage, or URL of isolate
1415 service to use shared cloud based storage.
1416 namespace: isolate namespace to operate in, also defines hashing and
1417 compression scheme used, i.e. namespace names that end with '-gzip'
1418 store compressed data.
1419
1420 Returns:
1421 Instance of StorageApi subclass.
1422 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001423 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001424 return IsolateServer(file_or_url, namespace)
1425 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001426 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001427
1428
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001429def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001430 """Returns Storage class that can upload and download from |namespace|.
1431
1432 Arguments:
1433 file_or_url: a file path to use file system based storage, or URL of isolate
1434 service to use shared cloud based storage.
1435 namespace: isolate namespace to operate in, also defines hashing and
1436 compression scheme used, i.e. namespace names that end with '-gzip'
1437 store compressed data.
1438
1439 Returns:
1440 Instance of Storage.
1441 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001442 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001443
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001444
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001445def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001446 """Uploads the given tree to the given url.
1447
1448 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001449 base_url: The url of the isolate server to upload to.
1450 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001451 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001452 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001453 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001454 # Filter out symlinks, since they are not represented by items on isolate
1455 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001456 items = []
1457 seen = set()
1458 skipped = 0
1459 for filepath, metadata in infiles:
1460 if 'l' not in metadata and filepath not in seen:
1461 seen.add(filepath)
1462 item = FileItem(
1463 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001464 digest=metadata['h'],
1465 size=metadata['s'],
1466 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001467 items.append(item)
1468 else:
1469 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001470
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001471 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001472 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001473 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001474
1475
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001476def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001477 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001478
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001479 Arguments:
1480 isolated_hash: hash of the root *.isolated file.
1481 storage: Storage class that communicates with isolate storage.
1482 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001483 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001484 require_command: Ensure *.isolated specifies a command to run.
1485
1486 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001487 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001488 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001489 logging.debug(
1490 'fetch_isolated(%s, %s, %s, %s, %s)',
1491 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001492 # Hash algorithm to use, defined by namespace |storage| is using.
1493 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001494 with cache:
1495 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001496 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001497
1498 with tools.Profiler('GetIsolateds'):
1499 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001500 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001501 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1502 try:
1503 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1504 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001505 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001506 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1507 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001508
1509 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001510 bundle.fetch(fetch_queue, isolated_hash, algo)
1511 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001512 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1513 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001514 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001515
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001516 with tools.Profiler('GetRest'):
1517 # Create file system hierarchy.
1518 if not os.path.isdir(outdir):
1519 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001520 create_directories(outdir, bundle.files)
1521 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001522
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001523 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001524 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001525 if not os.path.isdir(cwd):
1526 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001527
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001528 # Multimap: digest -> list of pairs (path, props).
1529 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001530 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001531 if 'h' in props:
1532 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001533
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001534 # Now block on the remaining files to be downloaded and mapped.
1535 logging.info('Retrieving remaining files (%d of them)...',
1536 fetch_queue.pending_count)
1537 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001538 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001539 while remaining:
1540 detector.ping()
1541
1542 # Wait for any item to finish fetching to cache.
1543 digest = fetch_queue.wait(remaining)
1544
1545 # Link corresponding files to a fetched item in cache.
1546 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001547 cache.hardlink(
1548 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001549
1550 # Report progress.
1551 duration = time.time() - last_update
1552 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1553 msg = '%d files remaining...' % len(remaining)
1554 print msg
1555 logging.info(msg)
1556 last_update = time.time()
1557
1558 # Cache could evict some items we just tried to fetch, it's a fatal error.
1559 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001560 raise isolated_format.MappingError(
1561 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001562 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001563
1564
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001565def directory_to_metadata(root, algo, blacklist):
1566 """Returns the FileItem list and .isolated metadata for a directory."""
1567 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001568 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001569 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001570 metadata = {
1571 relpath: isolated_format.file_to_metadata(
1572 os.path.join(root, relpath), {}, False, algo)
1573 for relpath in paths
1574 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001575 for v in metadata.itervalues():
1576 v.pop('t')
1577 items = [
1578 FileItem(
1579 path=os.path.join(root, relpath),
1580 digest=meta['h'],
1581 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001582 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001583 for relpath, meta in metadata.iteritems() if 'h' in meta
1584 ]
1585 return items, metadata
1586
1587
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001588def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001589 """Stores every entries and returns the relevant data.
1590
1591 Arguments:
1592 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001593 files: list of file paths to upload. If a directory is specified, a
1594 .isolated file is created and its hash is returned.
1595 blacklist: function that returns True if a file should be omitted.
1596 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001597 assert all(isinstance(i, unicode) for i in files), files
1598 if len(files) != len(set(map(os.path.abspath, files))):
1599 raise Error('Duplicate entries found.')
1600
1601 results = []
1602 # The temporary directory is only created as needed.
1603 tempdir = None
1604 try:
1605 # TODO(maruel): Yield the files to a worker thread.
1606 items_to_upload = []
1607 for f in files:
1608 try:
1609 filepath = os.path.abspath(f)
1610 if os.path.isdir(filepath):
1611 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001612 items, metadata = directory_to_metadata(
1613 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001614
1615 # Create the .isolated file.
1616 if not tempdir:
1617 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1618 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1619 os.close(handle)
1620 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001621 'algo':
1622 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001623 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001624 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001625 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001626 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001627 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001628 items_to_upload.extend(items)
1629 items_to_upload.append(
1630 FileItem(
1631 path=isolated,
1632 digest=h,
1633 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001634 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001635 results.append((h, f))
1636
1637 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001638 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001639 items_to_upload.append(
1640 FileItem(
1641 path=filepath,
1642 digest=h,
1643 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001644 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001645 results.append((h, f))
1646 else:
1647 raise Error('%s is neither a file or directory.' % f)
1648 except OSError:
1649 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001650 # Technically we would care about which files were uploaded but we don't
1651 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001652 _uploaded_files = storage.upload_items(items_to_upload)
1653 return results
1654 finally:
1655 if tempdir:
1656 shutil.rmtree(tempdir)
1657
1658
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001659def archive(out, namespace, files, blacklist):
1660 if files == ['-']:
1661 files = sys.stdin.readlines()
1662
1663 if not files:
1664 raise Error('Nothing to upload')
1665
1666 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001667 blacklist = tools.gen_blacklist(blacklist)
1668 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001669 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001670 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1671
1672
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001673@subcommand.usage('<file1..fileN> or - to read from stdin')
1674def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001675 """Archives data to the server.
1676
1677 If a directory is specified, a .isolated file is created the whole directory
1678 is uploaded. Then this .isolated file can be included in another one to run
1679 commands.
1680
1681 The commands output each file that was processed with its content hash. For
1682 directories, the .isolated generated for the directory is listed as the
1683 directory entry itself.
1684 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001685 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001686 parser.add_option(
1687 '--blacklist',
1688 action='append', default=list(DEFAULT_BLACKLIST),
1689 help='List of regexp to use as blacklist filter when uploading '
1690 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001691 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001692 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001693 if file_path.is_url(options.isolate_server):
1694 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001695 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001696 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001697 except Error as e:
1698 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001699 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001700
1701
1702def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001703 """Download data from the server.
1704
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001705 It can either download individual files or a complete tree from a .isolated
1706 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001707 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001708 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001709 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001710 '-i', '--isolated', metavar='HASH',
1711 help='hash of an isolated file, .isolated file content is discarded, use '
1712 '--file if you need it')
1713 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001714 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1715 help='hash and destination of a file, can be used multiple times')
1716 parser.add_option(
1717 '-t', '--target', metavar='DIR', default=os.getcwd(),
1718 help='destination directory')
1719 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001720 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001721 if args:
1722 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001723 if bool(options.isolated) == bool(options.file):
1724 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001725
1726 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001727
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001728 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001729 if file_path.is_url(remote):
1730 auth.ensure_logged_in(remote)
1731
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001732 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001733 # Fetching individual files.
1734 if options.file:
1735 channel = threading_utils.TaskChannel()
1736 pending = {}
1737 for digest, dest in options.file:
1738 pending[digest] = dest
1739 storage.async_fetch(
1740 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001741 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001742 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001743 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001744 functools.partial(file_write, os.path.join(options.target, dest)))
1745 while pending:
1746 fetched = channel.pull()
1747 dest = pending.pop(fetched)
1748 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001749
Vadim Shtayura3172be52013-12-03 12:49:05 -08001750 # Fetching whole isolated tree.
1751 if options.isolated:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001752 bundle = fetch_isolated(
Vadim Shtayura3172be52013-12-03 12:49:05 -08001753 isolated_hash=options.isolated,
1754 storage=storage,
1755 cache=MemoryCache(),
Vadim Shtayura3172be52013-12-03 12:49:05 -08001756 outdir=options.target,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001757 require_command=False)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001758 rel = os.path.join(options.target, bundle.relative_cwd)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001759 print('To run this test please run from the directory %s:' %
1760 os.path.join(options.target, rel))
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001761 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001762
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001763 return 0
1764
1765
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001766def add_isolate_server_options(parser, add_indir):
1767 """Adds --isolate-server and --namespace options to parser.
1768
1769 Includes --indir if desired.
1770 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001771 parser.add_option(
1772 '-I', '--isolate-server',
1773 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001774 help='URL of the Isolate Server to use. Defaults to the environment '
1775 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1776 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001777 parser.add_option(
1778 '--namespace', default='default-gzip',
1779 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001780 if add_indir:
1781 parser.add_option(
1782 '--indir', metavar='DIR',
1783 help='Directory used to store the hashtable instead of using an '
1784 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001785
1786
1787def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001788 """Processes the --isolate-server and --indir options and aborts if neither is
1789 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001790 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001791 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001792 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001793 if not has_indir:
1794 parser.error('--isolate-server is required.')
1795 elif not options.indir:
1796 parser.error('Use one of --indir or --isolate-server.')
1797 else:
1798 if has_indir and options.indir:
1799 parser.error('Use only one of --indir or --isolate-server.')
1800
1801 if options.isolate_server:
1802 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001803 if parts.query:
1804 parser.error('--isolate-server doesn\'t support query parameter.')
1805 if parts.fragment:
1806 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001807 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
1808 # what is desired here.
1809 new = list(parts)
1810 if not new[1] and new[2]:
1811 new[1] = new[2].rstrip('/')
1812 new[2] = ''
1813 new[2] = new[2].rstrip('/')
1814 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001815 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001816 return
1817
1818 if file_path.is_url(options.indir):
1819 parser.error('Can\'t use an URL for --indir.')
1820 options.indir = unicode(options.indir).replace('/', os.path.sep)
1821 options.indir = os.path.abspath(
1822 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
1823 if not os.path.isdir(options.indir):
1824 parser.error('Path given to --indir must exist.')
1825
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001826
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001827class OptionParserIsolateServer(tools.OptionParserWithLogging):
1828 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001829 tools.OptionParserWithLogging.__init__(
1830 self,
1831 version=__version__,
1832 prog=os.path.basename(sys.modules[__name__].__file__),
1833 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001834 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001835
1836 def parse_args(self, *args, **kwargs):
1837 options, args = tools.OptionParserWithLogging.parse_args(
1838 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001839 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001840 return options, args
1841
1842
1843def main(args):
1844 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001845 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001846
1847
1848if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001849 fix_encoding.fix_encoding()
1850 tools.disable_buffering()
1851 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001852 sys.exit(main(sys.argv[1:]))