blob: ffa8819887d7cf95fbe4a22b31c1b4f2bf9ff666 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000013import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050014import shutil
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050020import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000021import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000023from third_party import colorama
24from third_party.depot_tools import fix_encoding
25from third_party.depot_tools import subcommand
26
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050027from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000028from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040029from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000030from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000031from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000032
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080033import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040034import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000037# Version of isolate protocol passed to the server in /handshake request.
38ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040
Vadim Shtayura3148e072014-09-02 18:51:52 -070041# The file size to be used when we don't know the correct file size,
42# generally used for .isolated files.
43UNKNOWN_FILE_SIZE = None
44
45
46# Maximum expected delay (in seconds) between successive file fetches or uploads
47# in Storage. If it takes longer than that, a deadlock might be happening
48# and all stack frames for all threads are dumped to log.
49DEADLOCK_TIMEOUT = 5 * 60
50
51
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000052# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000053# All files are sorted by likelihood of a change in the file content
54# (currently file size is used to estimate this: larger the file -> larger the
55# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000056# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000057# and so on. Numbers here is a trade-off; the more per request, the lower the
58# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
59# larger values cause longer lookups, increasing the initial latency to start
60# uploading, which is especially an issue for large files. This value is
61# optimized for the "few thousands files to look up with minimal number of large
62# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040063ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000064
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000065
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000066# A list of already compressed extension types that should not receive any
67# compression before being uploaded.
68ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040069 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
70 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000071]
72
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000073
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000074# Chunk size to use when reading from network stream.
75NET_IO_FILE_CHUNK = 16 * 1024
76
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000077
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000078# Read timeout in seconds for downloads from isolate storage. If there's no
79# response from the server within this timeout whole download will be aborted.
80DOWNLOAD_READ_TIMEOUT = 60
81
82
maruel@chromium.org41601642013-09-18 19:40:46 +000083# The delay (in seconds) to wait between logging statements when retrieving
84# the required files. This is intended to let the user (or buildbot) know that
85# the program is still running.
86DELAY_BETWEEN_UPDATES_IN_SECS = 30
87
88
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050089DEFAULT_BLACKLIST = (
90 # Temporary vim or python files.
91 r'^.+\.(?:pyc|swp)$',
92 # .git or .svn directory.
93 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
94)
95
96
97# Chromium-specific.
98DEFAULT_BLACKLIST += (
99 r'^.+\.(?:run_test_cases)$',
100 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
101)
102
103
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500104class Error(Exception):
105 """Generic runtime error."""
106 pass
107
108
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000109def stream_read(stream, chunk_size):
110 """Reads chunks from |stream| and yields them."""
111 while True:
112 data = stream.read(chunk_size)
113 if not data:
114 break
115 yield data
116
117
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400118def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800119 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000120 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800121 if offset:
122 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000123 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000124 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 if not data:
126 break
127 yield data
128
129
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000130def file_write(filepath, content_generator):
131 """Writes file content as generated by content_generator.
132
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 Creates the intermediary directory as needed.
134
135 Returns the number of bytes written.
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 Meant to be mocked out in unit tests.
138 """
139 filedir = os.path.dirname(filepath)
140 if not os.path.isdir(filedir):
141 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 with open(filepath, 'wb') as f:
144 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000145 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148
149
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000150def zip_compress(content_generator, level=7):
151 """Reads chunks from |content_generator| and yields zip compressed chunks."""
152 compressor = zlib.compressobj(level)
153 for chunk in content_generator:
154 compressed = compressor.compress(chunk)
155 if compressed:
156 yield compressed
157 tail = compressor.flush(zlib.Z_FINISH)
158 if tail:
159 yield tail
160
161
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400162def zip_decompress(
163 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000164 """Reads zipped data from |content_generator| and yields decompressed data.
165
166 Decompresses data in small chunks (no larger than |chunk_size|) so that
167 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
168
169 Raises IOError if data is corrupted or incomplete.
170 """
171 decompressor = zlib.decompressobj()
172 compressed_size = 0
173 try:
174 for chunk in content_generator:
175 compressed_size += len(chunk)
176 data = decompressor.decompress(chunk, chunk_size)
177 if data:
178 yield data
179 while decompressor.unconsumed_tail:
180 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
181 if data:
182 yield data
183 tail = decompressor.flush()
184 if tail:
185 yield tail
186 except zlib.error as e:
187 raise IOError(
188 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
189 # Ensure all data was read and decompressed.
190 if decompressor.unused_data or decompressor.unconsumed_tail:
191 raise IOError('Not all data was decompressed')
192
193
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000194def get_zip_compression_level(filename):
195 """Given a filename calculates the ideal zip compression level to use."""
196 file_ext = os.path.splitext(filename)[1].lower()
197 # TODO(csharp): Profile to find what compression level works best.
198 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
199
200
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000201def create_directories(base_directory, files):
202 """Creates the directory structure needed by the given list of files."""
203 logging.debug('create_directories(%s, %d)', base_directory, len(files))
204 # Creates the tree of directories to create.
205 directories = set(os.path.dirname(f) for f in files)
206 for item in list(directories):
207 while item:
208 directories.add(item)
209 item = os.path.dirname(item)
210 for d in sorted(directories):
211 if d:
212 os.mkdir(os.path.join(base_directory, d))
213
214
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500215def create_symlinks(base_directory, files):
216 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000217 for filepath, properties in files:
218 if 'l' not in properties:
219 continue
220 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222 logging.warning('Ignoring symlink %s', filepath)
223 continue
224 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500225 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000226 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000227
228
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000229def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000230 """Determines if the given files appears valid.
231
232 Currently it just checks the file's size.
233 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700234 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000235 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 actual_size = os.stat(filepath).st_size
237 if size != actual_size:
238 logging.warning(
239 'Found invalid item %s; %d != %d',
240 os.path.basename(filepath), actual_size, size)
241 return False
242 return True
243
244
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000245class Item(object):
246 """An item to push to Storage.
247
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800248 Its digest and size may be provided in advance, if known. Otherwise they will
249 be derived from content(). If digest is provided, it MUST correspond to
250 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800252 When used with Storage, Item starts its life in a main thread, travels
253 to 'contains' thread, then to 'push' thread and then finally back to
254 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000255 """
256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000258 self.digest = digest
259 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800260 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def content(self):
264 """Iterable with content of this item as byte string (str) chunks."""
265 raise NotImplementedError()
266
267 def prepare(self, hash_algo):
268 """Ensures self.digest and self.size are set.
269
270 Uses content() as a source of data to calculate them. Does nothing if digest
271 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000272
273 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800274 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000275 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800276 if self.digest is None or self.size is None:
277 digest = hash_algo()
278 total = 0
279 for chunk in self.content():
280 digest.update(chunk)
281 total += len(chunk)
282 self.digest = digest.hexdigest()
283 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000284
285
286class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800287 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000288
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800289 Its digest and size may be provided in advance, if known. Otherwise they will
290 be derived from the file content.
291 """
292
293 def __init__(self, path, digest=None, size=None, high_priority=False):
294 super(FileItem, self).__init__(
295 digest,
296 size if size is not None else os.stat(path).st_size,
297 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000298 self.path = path
299 self.compression_level = get_zip_compression_level(path)
300
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800301 def content(self):
302 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000303
304
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000305class BufferItem(Item):
306 """A byte buffer to push to Storage."""
307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 def __init__(self, buf, high_priority=False):
309 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000310 self.buffer = buf
311
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800312 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000313 return [self.buffer]
314
315
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000316class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 Implements compression support, parallel 'contains' checks, parallel uploads
320 and more.
321
322 Works only within single namespace (and thus hashing algorithm and compression
323 scheme are fixed).
324
325 Spawns multiple internal threads. Thread safe, but not fork safe.
326 """
327
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700328 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000329 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400330 self._use_zip = isolated_format.is_namespace_with_compression(
331 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400332 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000333 self._cpu_thread_pool = None
334 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000335
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700337 def hash_algo(self):
338 """Hashing algorithm used to name files in storage based on their content.
339
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400340 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700341 """
342 return self._hash_algo
343
344 @property
345 def location(self):
346 """Location of a backing store that this class is using.
347
348 Exact meaning depends on the storage_api type. For IsolateServer it is
349 an URL of isolate server, for FileSystem is it a path in file system.
350 """
351 return self._storage_api.location
352
353 @property
354 def namespace(self):
355 """Isolate namespace used by this storage.
356
357 Indirectly defines hashing scheme and compression method used.
358 """
359 return self._storage_api.namespace
360
361 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000362 def cpu_thread_pool(self):
363 """ThreadPool for CPU-bound tasks like zipping."""
364 if self._cpu_thread_pool is None:
365 self._cpu_thread_pool = threading_utils.ThreadPool(
366 2, max(threading_utils.num_processors(), 2), 0, 'zip')
367 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000368
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000369 @property
370 def net_thread_pool(self):
371 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
372 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700373 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000374 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000375
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000376 def close(self):
377 """Waits for all pending tasks to finish."""
378 if self._cpu_thread_pool:
379 self._cpu_thread_pool.join()
380 self._cpu_thread_pool.close()
381 self._cpu_thread_pool = None
382 if self._net_thread_pool:
383 self._net_thread_pool.join()
384 self._net_thread_pool.close()
385 self._net_thread_pool = None
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000386
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 def __enter__(self):
388 """Context manager interface."""
389 return self
390
391 def __exit__(self, _exc_type, _exc_value, _traceback):
392 """Context manager interface."""
393 self.close()
394 return False
395
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000396 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800397 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000398
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800399 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000400
401 Arguments:
402 items: list of Item instances that represents data to upload.
403
404 Returns:
405 List of items that were uploaded. All other items are already there.
406 """
407 # TODO(vadimsh): Optimize special case of len(items) == 1 that is frequently
408 # used by swarming.py. There's no need to spawn multiple threads and try to
409 # do stuff in parallel: there's nothing to parallelize. 'contains' check and
410 # 'push' should be performed sequentially in the context of current thread.
411
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800412 # Ensure all digests are calculated.
413 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700414 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800415
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000416 # For each digest keep only first Item that matches it. All other items
417 # are just indistinguishable copies from the point of view of isolate
418 # server (it doesn't care about paths at all, only content and digests).
419 seen = {}
420 duplicates = 0
421 for item in items:
422 if seen.setdefault(item.digest, item) is not item:
423 duplicates += 1
424 items = seen.values()
425 if duplicates:
426 logging.info('Skipped %d duplicated files', duplicates)
427
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000428 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000429 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000430 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800431 channel = threading_utils.TaskChannel()
432 for missing_item, push_state in self.get_missing_items(items):
433 missing.add(missing_item)
434 self.async_push(channel, missing_item, push_state)
435
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000436 # No need to spawn deadlock detector thread if there's nothing to upload.
437 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700438 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000439 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000440 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000441 detector.ping()
442 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000443 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000444 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000445 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000446 logging.info('All files are uploaded')
447
448 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000449 total = len(items)
450 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 logging.info(
452 'Total: %6d, %9.1fkb',
453 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000454 total_size / 1024.)
455 cache_hit = set(items) - missing
456 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 logging.info(
458 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
459 len(cache_hit),
460 cache_hit_size / 1024.,
461 len(cache_hit) * 100. / total,
462 cache_hit_size * 100. / total_size if total_size else 0)
463 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000464 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 logging.info(
466 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
467 len(cache_miss),
468 cache_miss_size / 1024.,
469 len(cache_miss) * 100. / total,
470 cache_miss_size * 100. / total_size if total_size else 0)
471
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000472 return uploaded
473
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800474 def get_fetch_url(self, item):
475 """Returns an URL that can be used to fetch given item once it's uploaded.
476
477 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000478
479 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800480 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000481
482 Returns:
483 An URL or None if underlying protocol doesn't support this.
484 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700485 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800486 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000487
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800488 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000489 """Starts asynchronous push to the server in a parallel thread.
490
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800491 Can be used only after |item| was checked for presence on a server with
492 'get_missing_items' call. 'get_missing_items' returns |push_state| object
493 that contains storage specific information describing how to upload
494 the item (for example in case of cloud storage, it is signed upload URLs).
495
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000496 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000497 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000498 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800499 push_state: push state returned by 'get_missing_items' call for |item|.
500
501 Returns:
502 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000503 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400505 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700506 threading_utils.PRIORITY_HIGH if item.high_priority
507 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800508
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000509 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400510 """Pushes an Item and returns it to |channel|."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700511 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000513 return item
514
515 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700516 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800517 self.net_thread_pool.add_task_with_channel(
518 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000519 return
520
521 # If zipping is enabled, zip in a separate thread.
522 def zip_and_push():
523 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
524 # content right here. It will block until all file is zipped.
525 try:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800526 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000527 data = ''.join(stream)
528 except Exception as exc:
529 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800530 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000531 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000532 self.net_thread_pool.add_task_with_channel(
533 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000534 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000535
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800536 def push(self, item, push_state):
537 """Synchronously pushes a single item to the server.
538
539 If you need to push many items at once, consider using 'upload_items' or
540 'async_push' with instance of TaskChannel.
541
542 Arguments:
543 item: item to upload as instance of Item class.
544 push_state: push state returned by 'get_missing_items' call for |item|.
545
546 Returns:
547 Pushed item (same object as |item|).
548 """
549 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700550 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800551 self.async_push(channel, item, push_state)
552 pushed = channel.pull()
553 assert pushed is item
554 return item
555
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000556 def async_fetch(self, channel, priority, digest, size, sink):
557 """Starts asynchronous fetch from the server in a parallel thread.
558
559 Arguments:
560 channel: TaskChannel that receives back |digest| when download ends.
561 priority: thread pool task priority for the fetch.
562 digest: hex digest of an item to download.
563 size: expected size of the item (after decompression).
564 sink: function that will be called as sink(generator).
565 """
566 def fetch():
567 try:
568 # Prepare reading pipeline.
569 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700570 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400571 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000572 # Run |stream| through verifier that will assert its size.
573 verifier = FetchStreamVerifier(stream, size)
574 # Verified stream goes to |sink|.
575 sink(verifier.run())
576 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800577 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000578 raise
579 return digest
580
581 # Don't bother with zip_thread_pool for decompression. Decompression is
582 # really fast and most probably IO bound anyway.
583 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
584
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000585 def get_missing_items(self, items):
586 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000587
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000588 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000589
590 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000591 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000592
593 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800594 For each missing item it yields a pair (item, push_state), where:
595 * item - Item object that is missing (one of |items|).
596 * push_state - opaque object that contains storage specific information
597 describing how to upload the item (for example in case of cloud
598 storage, it is signed upload URLs). It can later be passed to
599 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000600 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000601 channel = threading_utils.TaskChannel()
602 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800603
604 # Ensure all digests are calculated.
605 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700606 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800607
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000608 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800609 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400610 self.net_thread_pool.add_task_with_channel(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700611 channel, threading_utils.PRIORITY_HIGH,
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000612 self._storage_api.contains, batch)
613 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 # Yield results as they come in.
616 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800617 for missing_item, push_state in channel.pull().iteritems():
618 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000619
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800621def batch_items_for_check(items):
622 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624 Each batch corresponds to a single 'exists?' query to the server via a call
625 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000626
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 Arguments:
628 items: a list of Item objects.
629
630 Yields:
631 Batches of items to query for existence in a single operation,
632 each batch is a list of Item objects.
633 """
634 batch_count = 0
635 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
636 next_queries = []
637 for item in sorted(items, key=lambda x: x.size, reverse=True):
638 next_queries.append(item)
639 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000640 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800641 next_queries = []
642 batch_count += 1
643 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
644 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
645 if next_queries:
646 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000647
648
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000649class FetchQueue(object):
650 """Fetches items from Storage and places them into LocalCache.
651
652 It manages multiple concurrent fetch operations. Acts as a bridge between
653 Storage and LocalCache so that Storage and LocalCache don't depend on each
654 other at all.
655 """
656
657 def __init__(self, storage, cache):
658 self.storage = storage
659 self.cache = cache
660 self._channel = threading_utils.TaskChannel()
661 self._pending = set()
662 self._accessed = set()
663 self._fetched = cache.cached_set()
664
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400665 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700666 self,
667 digest,
668 size=UNKNOWN_FILE_SIZE,
669 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000670 """Starts asynchronous fetch of item |digest|."""
671 # Fetching it now?
672 if digest in self._pending:
673 return
674
675 # Mark this file as in use, verify_all_cached will later ensure it is still
676 # in cache.
677 self._accessed.add(digest)
678
679 # Already fetched? Notify cache to update item's LRU position.
680 if digest in self._fetched:
681 # 'touch' returns True if item is in cache and not corrupted.
682 if self.cache.touch(digest, size):
683 return
684 # Item is corrupted, remove it from cache and fetch it again.
685 self._fetched.remove(digest)
686 self.cache.evict(digest)
687
688 # TODO(maruel): It should look at the free disk space, the current cache
689 # size and the size of the new item on every new item:
690 # - Trim the cache as more entries are listed when free disk space is low,
691 # otherwise if the amount of data downloaded during the run > free disk
692 # space, it'll crash.
693 # - Make sure there's enough free disk space to fit all dependencies of
694 # this run! If not, abort early.
695
696 # Start fetching.
697 self._pending.add(digest)
698 self.storage.async_fetch(
699 self._channel, priority, digest, size,
700 functools.partial(self.cache.write, digest))
701
702 def wait(self, digests):
703 """Starts a loop that waits for at least one of |digests| to be retrieved.
704
705 Returns the first digest retrieved.
706 """
707 # Flush any already fetched items.
708 for digest in digests:
709 if digest in self._fetched:
710 return digest
711
712 # Ensure all requested items are being fetched now.
713 assert all(digest in self._pending for digest in digests), (
714 digests, self._pending)
715
716 # Wait for some requested item to finish fetching.
717 while self._pending:
718 digest = self._channel.pull()
719 self._pending.remove(digest)
720 self._fetched.add(digest)
721 if digest in digests:
722 return digest
723
724 # Should never reach this point due to assert above.
725 raise RuntimeError('Impossible state')
726
727 def inject_local_file(self, path, algo):
728 """Adds local file to the cache as if it was fetched from storage."""
729 with open(path, 'rb') as f:
730 data = f.read()
731 digest = algo(data).hexdigest()
732 self.cache.write(digest, [data])
733 self._fetched.add(digest)
734 return digest
735
736 @property
737 def pending_count(self):
738 """Returns number of items to be fetched."""
739 return len(self._pending)
740
741 def verify_all_cached(self):
742 """True if all accessed items are in cache."""
743 return self._accessed.issubset(self.cache.cached_set())
744
745
746class FetchStreamVerifier(object):
747 """Verifies that fetched file is valid before passing it to the LocalCache."""
748
749 def __init__(self, stream, expected_size):
750 self.stream = stream
751 self.expected_size = expected_size
752 self.current_size = 0
753
754 def run(self):
755 """Generator that yields same items as |stream|.
756
757 Verifies |stream| is complete before yielding a last chunk to consumer.
758
759 Also wraps IOError produced by consumer into MappingError exceptions since
760 otherwise Storage will retry fetch on unrelated local cache errors.
761 """
762 # Read one chunk ahead, keep it in |stored|.
763 # That way a complete stream can be verified before pushing last chunk
764 # to consumer.
765 stored = None
766 for chunk in self.stream:
767 assert chunk is not None
768 if stored is not None:
769 self._inspect_chunk(stored, is_last=False)
770 try:
771 yield stored
772 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400773 raise isolated_format.MappingError(
774 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000775 stored = chunk
776 if stored is not None:
777 self._inspect_chunk(stored, is_last=True)
778 try:
779 yield stored
780 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400781 raise isolated_format.MappingError(
782 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000783
784 def _inspect_chunk(self, chunk, is_last):
785 """Called for each fetched chunk before passing it to consumer."""
786 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400787 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700788 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000789 (self.expected_size != self.current_size)):
790 raise IOError('Incorrect file size: expected %d, got %d' % (
791 self.expected_size, self.current_size))
792
793
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000794class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800795 """Interface for classes that implement low-level storage operations.
796
797 StorageApi is oblivious of compression and hashing scheme used. This details
798 are handled in higher level Storage class.
799
800 Clients should generally not use StorageApi directly. Storage class is
801 preferred since it implements compression and upload optimizations.
802 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000803
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700804 @property
805 def location(self):
806 """Location of a backing store that this class is using.
807
808 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
809 server, for FileSystem is it a path in file system.
810 """
811 raise NotImplementedError()
812
813 @property
814 def namespace(self):
815 """Isolate namespace used by this storage.
816
817 Indirectly defines hashing scheme and compression method used.
818 """
819 raise NotImplementedError()
820
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000821 def get_fetch_url(self, digest):
822 """Returns an URL that can be used to fetch an item with given digest.
823
824 Arguments:
825 digest: hex digest of item to fetch.
826
827 Returns:
828 An URL or None if the protocol doesn't support this.
829 """
830 raise NotImplementedError()
831
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800832 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000833 """Fetches an object and yields its content.
834
835 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000836 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800837 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000838
839 Yields:
840 Chunks of downloaded item (as str objects).
841 """
842 raise NotImplementedError()
843
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800844 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000845 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000846
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800847 |item| MUST go through 'contains' call to get |push_state| before it can
848 be pushed to the storage.
849
850 To be clear, here is one possible usage:
851 all_items = [... all items to push as Item subclasses ...]
852 for missing_item, push_state in storage_api.contains(all_items).items():
853 storage_api.push(missing_item, push_state)
854
855 When pushing to a namespace with compression, data that should be pushed
856 and data provided by the item is not the same. In that case |content| is
857 not None and it yields chunks of compressed data (using item.content() as
858 a source of original uncompressed data). This is implemented by Storage
859 class.
860
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000861 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000862 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800863 push_state: push state object as returned by 'contains' call.
864 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000865
866 Returns:
867 None.
868 """
869 raise NotImplementedError()
870
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000871 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800872 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000873
874 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000876
877 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 A dict missing Item -> opaque push state object to be passed to 'push'.
879 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000880 """
881 raise NotImplementedError()
882
883
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800884class _IsolateServerPushState(object):
885 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500886
887 Note this needs to be a global class to support pickling.
888 """
889
890 def __init__(self, upload_url, finalize_url):
891 self.upload_url = upload_url
892 self.finalize_url = finalize_url
893 self.uploaded = False
894 self.finalized = False
895
896
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000897class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000898 """StorageApi implementation that downloads and uploads to Isolate Server.
899
900 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800901 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000902 """
903
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000904 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000905 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000906 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700907 self._base_url = base_url.rstrip('/')
908 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000909 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000910 self._server_caps = None
911
912 @staticmethod
913 def _generate_handshake_request():
914 """Returns a dict to be sent as handshake request body."""
915 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
916 return {
917 'client_app_version': __version__,
918 'fetcher': True,
919 'protocol_version': ISOLATE_PROTOCOL_VERSION,
920 'pusher': True,
921 }
922
923 @staticmethod
924 def _validate_handshake_response(caps):
925 """Validates and normalizes handshake response."""
926 logging.info('Protocol version: %s', caps['protocol_version'])
927 logging.info('Server version: %s', caps['server_app_version'])
928 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400929 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000930 if not caps['access_token']:
931 raise ValueError('access_token is missing')
932 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000933
934 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000935 def _server_capabilities(self):
936 """Performs handshake with the server if not yet done.
937
938 Returns:
939 Server capabilities dictionary as returned by /handshake endpoint.
940
941 Raises:
942 MappingError if server rejects the handshake.
943 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000944 # TODO(maruel): Make this request much earlier asynchronously while the
945 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800946
947 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
948 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000949 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000950 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000951 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400952 caps = net.url_read_json(
953 url=self._base_url + '/content-gs/handshake',
954 data=self._generate_handshake_request())
955 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400956 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000957 if not isinstance(caps, dict):
958 raise ValueError('Expecting JSON dict')
959 self._server_caps = self._validate_handshake_response(caps)
960 except (ValueError, KeyError, TypeError) as exc:
961 # KeyError exception has very confusing str conversion: it's just a
962 # missing key value and nothing else. So print exception class name
963 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400964 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400965 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000966 exc.__class__.__name__, exc))
967 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000968
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700969 @property
970 def location(self):
971 return self._base_url
972
973 @property
974 def namespace(self):
975 return self._namespace
976
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000977 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000978 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000979 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700980 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000981
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800982 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000983 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800984 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000985
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000986 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800987 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800988 read_timeout=DOWNLOAD_READ_TIMEOUT,
989 headers={'Range': 'bytes=%d-' % offset} if offset else None)
990
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000991 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -0800992 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800993
994 # If |offset| is used, verify server respects it by checking Content-Range.
995 if offset:
996 content_range = connection.get_header('Content-Range')
997 if not content_range:
998 raise IOError('Missing Content-Range header')
999
1000 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1001 # According to a spec, <size> can be '*' meaning "Total size of the file
1002 # is not known in advance".
1003 try:
1004 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1005 if not match:
1006 raise ValueError()
1007 content_offset = int(match.group(1))
1008 last_byte_index = int(match.group(2))
1009 size = None if match.group(3) == '*' else int(match.group(3))
1010 except ValueError:
1011 raise IOError('Invalid Content-Range header: %s' % content_range)
1012
1013 # Ensure returned offset equals requested one.
1014 if offset != content_offset:
1015 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1016 offset, content_offset, content_range))
1017
1018 # Ensure entire tail of the file is returned.
1019 if size is not None and last_byte_index + 1 != size:
1020 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1021
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001022 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001023
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001024 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001025 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001026 assert item.digest is not None
1027 assert item.size is not None
1028 assert isinstance(push_state, _IsolateServerPushState)
1029 assert not push_state.finalized
1030
1031 # Default to item.content().
1032 content = item.content() if content is None else content
1033
1034 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1035 if isinstance(content, basestring):
1036 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1037 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001038
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001039 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1040 # If |content| is indeed a generator, it can not be re-winded back
1041 # to the beginning of the stream. A retry will find it exhausted. A possible
1042 # solution is to wrap |content| generator with some sort of caching
1043 # restartable generator. It should be done alongside streaming support
1044 # implementation.
1045
1046 # This push operation may be a retry after failed finalization call below,
1047 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001048 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001049 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1050 # upload support is implemented.
1051 if isinstance(content, list) and len(content) == 1:
1052 content = content[0]
1053 else:
1054 content = ''.join(content)
1055 # PUT file to |upload_url|.
1056 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001057 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001058 data=content,
1059 content_type='application/octet-stream',
1060 method='PUT')
1061 if response is None:
1062 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001063 item.digest, push_state.upload_url))
1064 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001065 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001066 logging.info(
1067 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001068
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001069 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001070 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001071 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1072 # send it to isolated server. That way isolate server can verify that
1073 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1074 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001075 # TODO(maruel): Fix the server to accept propery data={} so
1076 # url_read_json() can be used.
1077 response = net.url_read(
1078 url=push_state.finalize_url,
1079 data='',
1080 content_type='application/json',
1081 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001082 if response is None:
1083 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001084 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001085
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001086 def contains(self, items):
1087 logging.info('Checking existence of %d files...', len(items))
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001088
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001089 # Ensure all items were initialized with 'prepare' call. Storage does that.
1090 assert all(i.digest is not None and i.size is not None for i in items)
1091
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001092 # Request body is a json encoded list of dicts.
1093 body = [
1094 {
1095 'h': item.digest,
1096 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001097 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001098 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001099 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001100
1101 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001102 self._base_url,
1103 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001104 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001105
1106 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001107 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001108 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001109 response = net.url_read_json(url=query_url, data=body)
1110 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001111 raise isolated_format.MappingError(
1112 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001113 if not isinstance(response, list):
1114 raise ValueError('Expecting response with json-encoded list')
1115 if len(response) != len(items):
1116 raise ValueError(
1117 'Incorrect number of items in the list, expected %d, '
1118 'but got %d' % (len(items), len(response)))
1119 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001120 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001121 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001122
1123 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001124 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001125 for i, push_urls in enumerate(response):
1126 if push_urls:
1127 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001128 missing_items[items[i]] = _IsolateServerPushState(
1129 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001130 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001131 len(items), len(items) - len(missing_items))
1132 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001133
1134
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001135class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001136 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001137
1138 The common use case is a NFS/CIFS file server that is mounted locally that is
1139 used to fetch the file on a local partition.
1140 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001141
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001142 # Used for push_state instead of None. That way caller is forced to
1143 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1144 _DUMMY_PUSH_STATE = object()
1145
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001146 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001147 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001148 self._base_path = base_path
1149 self._namespace = namespace
1150
1151 @property
1152 def location(self):
1153 return self._base_path
1154
1155 @property
1156 def namespace(self):
1157 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001158
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001159 def get_fetch_url(self, digest):
1160 return None
1161
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001162 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001163 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001164 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001165
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001166 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001167 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001168 assert item.digest is not None
1169 assert item.size is not None
1170 assert push_state is self._DUMMY_PUSH_STATE
1171 content = item.content() if content is None else content
1172 if isinstance(content, basestring):
1173 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1174 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001175 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001176
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001177 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001178 assert all(i.digest is not None and i.size is not None for i in items)
1179 return dict(
1180 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001181 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001182 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001183
1184
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001185class LocalCache(object):
1186 """Local cache that stores objects fetched via Storage.
1187
1188 It can be accessed concurrently from multiple threads, so it should protect
1189 its internal state with some lock.
1190 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001191 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001192
1193 def __enter__(self):
1194 """Context manager interface."""
1195 return self
1196
1197 def __exit__(self, _exc_type, _exec_value, _traceback):
1198 """Context manager interface."""
1199 return False
1200
1201 def cached_set(self):
1202 """Returns a set of all cached digests (always a new object)."""
1203 raise NotImplementedError()
1204
1205 def touch(self, digest, size):
1206 """Ensures item is not corrupted and updates its LRU position.
1207
1208 Arguments:
1209 digest: hash digest of item to check.
1210 size: expected size of this item.
1211
1212 Returns:
1213 True if item is in cache and not corrupted.
1214 """
1215 raise NotImplementedError()
1216
1217 def evict(self, digest):
1218 """Removes item from cache if it's there."""
1219 raise NotImplementedError()
1220
1221 def read(self, digest):
1222 """Returns contents of the cached item as a single str."""
1223 raise NotImplementedError()
1224
1225 def write(self, digest, content):
1226 """Reads data from |content| generator and stores it in cache."""
1227 raise NotImplementedError()
1228
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001229 def hardlink(self, digest, dest, file_mode):
1230 """Ensures file at |dest| has same content as cached |digest|.
1231
1232 If file_mode is provided, it is used to set the executable bit if
1233 applicable.
1234 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001235 raise NotImplementedError()
1236
1237
1238class MemoryCache(LocalCache):
1239 """LocalCache implementation that stores everything in memory."""
1240
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001241 def __init__(self, file_mode_mask=0500):
1242 """Args:
1243 file_mode_mask: bit mask to AND file mode with. Default value will make
1244 all mapped files to be read only.
1245 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001246 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001247 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001248 # Let's not assume dict is thread safe.
1249 self._lock = threading.Lock()
1250 self._contents = {}
1251
1252 def cached_set(self):
1253 with self._lock:
1254 return set(self._contents)
1255
1256 def touch(self, digest, size):
1257 with self._lock:
1258 return digest in self._contents
1259
1260 def evict(self, digest):
1261 with self._lock:
1262 self._contents.pop(digest, None)
1263
1264 def read(self, digest):
1265 with self._lock:
1266 return self._contents[digest]
1267
1268 def write(self, digest, content):
1269 # Assemble whole stream before taking the lock.
1270 data = ''.join(content)
1271 with self._lock:
1272 self._contents[digest] = data
1273
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001274 def hardlink(self, digest, dest, file_mode):
1275 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001276 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001277 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001278 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001279
1280
Vadim Shtayura3148e072014-09-02 18:51:52 -07001281class Settings(object):
1282 """Results of a completely parsed .isolated file."""
1283 def __init__(self):
1284 self.command = []
1285 self.files = {}
1286 self.read_only = None
1287 self.relative_cwd = None
1288 # The main .isolated file, a IsolatedFile instance.
1289 self.root = None
1290
1291 def load(self, fetch_queue, root_isolated_hash, algo):
1292 """Loads the .isolated and all the included .isolated asynchronously.
1293
1294 It enables support for "included" .isolated files. They are processed in
1295 strict order but fetched asynchronously from the cache. This is important so
1296 that a file in an included .isolated file that is overridden by an embedding
1297 .isolated file is not fetched needlessly. The includes are fetched in one
1298 pass and the files are fetched as soon as all the ones on the left-side
1299 of the tree were fetched.
1300
1301 The prioritization is very important here for nested .isolated files.
1302 'includes' have the highest priority and the algorithm is optimized for both
1303 deep and wide trees. A deep one is a long link of .isolated files referenced
1304 one at a time by one item in 'includes'. A wide one has a large number of
1305 'includes' in a single .isolated file. 'left' is defined as an included
1306 .isolated file earlier in the 'includes' list. So the order of the elements
1307 in 'includes' is important.
1308 """
1309 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1310
1311 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1312 pending = {}
1313 # Set of hashes of already retrieved items to refuse recursive includes.
1314 seen = set()
1315
1316 def retrieve(isolated_file):
1317 h = isolated_file.obj_hash
1318 if h in seen:
1319 raise isolated_format.IsolatedError(
1320 'IsolatedFile %s is retrieved recursively' % h)
1321 assert h not in pending
1322 seen.add(h)
1323 pending[h] = isolated_file
1324 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1325
1326 retrieve(self.root)
1327
1328 while pending:
1329 item_hash = fetch_queue.wait(pending)
1330 item = pending.pop(item_hash)
1331 item.load(fetch_queue.cache.read(item_hash))
1332 if item_hash == root_isolated_hash:
1333 # It's the root item.
1334 item.can_fetch = True
1335
1336 for new_child in item.children:
1337 retrieve(new_child)
1338
1339 # Traverse the whole tree to see if files can now be fetched.
1340 self._traverse_tree(fetch_queue, self.root)
1341
1342 def check(n):
1343 return all(check(x) for x in n.children) and n.files_fetched
1344 assert check(self.root)
1345
1346 self.relative_cwd = self.relative_cwd or ''
1347
1348 def _traverse_tree(self, fetch_queue, node):
1349 if node.can_fetch:
1350 if not node.files_fetched:
1351 self._update_self(fetch_queue, node)
1352 will_break = False
1353 for i in node.children:
1354 if not i.can_fetch:
1355 if will_break:
1356 break
1357 # Automatically mark the first one as fetcheable.
1358 i.can_fetch = True
1359 will_break = True
1360 self._traverse_tree(fetch_queue, i)
1361
1362 def _update_self(self, fetch_queue, node):
1363 node.fetch_files(fetch_queue, self.files)
1364 # Grabs properties.
1365 if not self.command and node.data.get('command'):
1366 # Ensure paths are correctly separated on windows.
1367 self.command = node.data['command']
1368 if self.command:
1369 self.command[0] = self.command[0].replace('/', os.path.sep)
1370 self.command = tools.fix_python_path(self.command)
1371 if self.read_only is None and node.data.get('read_only') is not None:
1372 self.read_only = node.data['read_only']
1373 if (self.relative_cwd is None and
1374 node.data.get('relative_cwd') is not None):
1375 self.relative_cwd = node.data['relative_cwd']
1376
1377
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001378def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001379 """Returns an object that implements low-level StorageApi interface.
1380
1381 It is used by Storage to work with single isolate |namespace|. It should
1382 rarely be used directly by clients, see 'get_storage' for
1383 a better alternative.
1384
1385 Arguments:
1386 file_or_url: a file path to use file system based storage, or URL of isolate
1387 service to use shared cloud based storage.
1388 namespace: isolate namespace to operate in, also defines hashing and
1389 compression scheme used, i.e. namespace names that end with '-gzip'
1390 store compressed data.
1391
1392 Returns:
1393 Instance of StorageApi subclass.
1394 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001395 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001396 return IsolateServer(file_or_url, namespace)
1397 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001398 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001399
1400
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001401def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001402 """Returns Storage class that can upload and download from |namespace|.
1403
1404 Arguments:
1405 file_or_url: a file path to use file system based storage, or URL of isolate
1406 service to use shared cloud based storage.
1407 namespace: isolate namespace to operate in, also defines hashing and
1408 compression scheme used, i.e. namespace names that end with '-gzip'
1409 store compressed data.
1410
1411 Returns:
1412 Instance of Storage.
1413 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001414 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001415
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001416
maruel@chromium.org7b844a62013-09-17 13:04:59 +00001417def upload_tree(base_url, indir, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001418 """Uploads the given tree to the given url.
1419
1420 Arguments:
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001421 base_url: The base url, it is assume that |base_url|/has/ can be used to
1422 query if an element was already uploaded, and |base_url|/store/
1423 can be used to upload a new element.
1424 indir: Root directory the infiles are based in.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001425 infiles: dict of files to upload from |indir| to |base_url|.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001426 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001427 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001428 logging.info('upload_tree(indir=%s, files=%d)', indir, len(infiles))
1429
1430 # Convert |indir| + |infiles| into a list of FileItem objects.
1431 # Filter out symlinks, since they are not represented by items on isolate
1432 # server side.
1433 items = [
1434 FileItem(
1435 path=os.path.join(indir, filepath),
1436 digest=metadata['h'],
1437 size=metadata['s'],
1438 high_priority=metadata.get('priority') == '0')
1439 for filepath, metadata in infiles.iteritems()
1440 if 'l' not in metadata
1441 ]
1442
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001443 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001444 storage.upload_items(items)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001445 return 0
Vadim Shtayura3148e072014-09-02 18:51:52 -07001446
1447
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001448def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001449 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001450
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001451 Arguments:
1452 isolated_hash: hash of the root *.isolated file.
1453 storage: Storage class that communicates with isolate storage.
1454 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001455 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001456 require_command: Ensure *.isolated specifies a command to run.
1457
1458 Returns:
1459 Settings object that holds details about loaded *.isolated file.
1460 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001461 logging.debug(
1462 'fetch_isolated(%s, %s, %s, %s, %s)',
1463 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001464 # Hash algorithm to use, defined by namespace |storage| is using.
1465 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001466 with cache:
1467 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001468 settings = Settings()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001469
1470 with tools.Profiler('GetIsolateds'):
1471 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001472 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001473 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1474 try:
1475 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1476 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001477 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001478 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1479 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001480
1481 # Load all *.isolated and start loading rest of the files.
Marc-Antoine Ruel05199462014-03-13 15:40:48 -04001482 settings.load(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001483 if require_command and not settings.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001484 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1485 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001486 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001487
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001488 with tools.Profiler('GetRest'):
1489 # Create file system hierarchy.
1490 if not os.path.isdir(outdir):
1491 os.makedirs(outdir)
1492 create_directories(outdir, settings.files)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -05001493 create_symlinks(outdir, settings.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001494
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001495 # Ensure working directory exists.
1496 cwd = os.path.normpath(os.path.join(outdir, settings.relative_cwd))
1497 if not os.path.isdir(cwd):
1498 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001499
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001500 # Multimap: digest -> list of pairs (path, props).
1501 remaining = {}
1502 for filepath, props in settings.files.iteritems():
1503 if 'h' in props:
1504 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001505
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001506 # Now block on the remaining files to be downloaded and mapped.
1507 logging.info('Retrieving remaining files (%d of them)...',
1508 fetch_queue.pending_count)
1509 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001510 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001511 while remaining:
1512 detector.ping()
1513
1514 # Wait for any item to finish fetching to cache.
1515 digest = fetch_queue.wait(remaining)
1516
1517 # Link corresponding files to a fetched item in cache.
1518 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001519 cache.hardlink(
1520 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001521
1522 # Report progress.
1523 duration = time.time() - last_update
1524 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1525 msg = '%d files remaining...' % len(remaining)
1526 print msg
1527 logging.info(msg)
1528 last_update = time.time()
1529
1530 # Cache could evict some items we just tried to fetch, it's a fatal error.
1531 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001532 raise isolated_format.MappingError(
1533 'Cache is too small to hold all requested files')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001534 return settings
1535
1536
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001537def directory_to_metadata(root, algo, blacklist):
1538 """Returns the FileItem list and .isolated metadata for a directory."""
1539 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001540 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001541 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001542 metadata = {
1543 relpath: isolated_format.file_to_metadata(
1544 os.path.join(root, relpath), {}, False, algo)
1545 for relpath in paths
1546 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001547 for v in metadata.itervalues():
1548 v.pop('t')
1549 items = [
1550 FileItem(
1551 path=os.path.join(root, relpath),
1552 digest=meta['h'],
1553 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001554 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001555 for relpath, meta in metadata.iteritems() if 'h' in meta
1556 ]
1557 return items, metadata
1558
1559
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001560def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001561 """Stores every entries and returns the relevant data.
1562
1563 Arguments:
1564 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001565 files: list of file paths to upload. If a directory is specified, a
1566 .isolated file is created and its hash is returned.
1567 blacklist: function that returns True if a file should be omitted.
1568 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001569 assert all(isinstance(i, unicode) for i in files), files
1570 if len(files) != len(set(map(os.path.abspath, files))):
1571 raise Error('Duplicate entries found.')
1572
1573 results = []
1574 # The temporary directory is only created as needed.
1575 tempdir = None
1576 try:
1577 # TODO(maruel): Yield the files to a worker thread.
1578 items_to_upload = []
1579 for f in files:
1580 try:
1581 filepath = os.path.abspath(f)
1582 if os.path.isdir(filepath):
1583 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001584 items, metadata = directory_to_metadata(
1585 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001586
1587 # Create the .isolated file.
1588 if not tempdir:
1589 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1590 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1591 os.close(handle)
1592 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001593 'algo':
1594 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001595 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001596 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001597 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001598 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001599 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001600 items_to_upload.extend(items)
1601 items_to_upload.append(
1602 FileItem(
1603 path=isolated,
1604 digest=h,
1605 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001606 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001607 results.append((h, f))
1608
1609 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001610 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001611 items_to_upload.append(
1612 FileItem(
1613 path=filepath,
1614 digest=h,
1615 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001616 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001617 results.append((h, f))
1618 else:
1619 raise Error('%s is neither a file or directory.' % f)
1620 except OSError:
1621 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001622 # Technically we would care about which files were uploaded but we don't
1623 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001624 _uploaded_files = storage.upload_items(items_to_upload)
1625 return results
1626 finally:
1627 if tempdir:
1628 shutil.rmtree(tempdir)
1629
1630
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001631def archive(out, namespace, files, blacklist):
1632 if files == ['-']:
1633 files = sys.stdin.readlines()
1634
1635 if not files:
1636 raise Error('Nothing to upload')
1637
1638 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001639 blacklist = tools.gen_blacklist(blacklist)
1640 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001641 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001642 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1643
1644
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001645@subcommand.usage('<file1..fileN> or - to read from stdin')
1646def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001647 """Archives data to the server.
1648
1649 If a directory is specified, a .isolated file is created the whole directory
1650 is uploaded. Then this .isolated file can be included in another one to run
1651 commands.
1652
1653 The commands output each file that was processed with its content hash. For
1654 directories, the .isolated generated for the directory is listed as the
1655 directory entry itself.
1656 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001657 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001658 parser.add_option(
1659 '--blacklist',
1660 action='append', default=list(DEFAULT_BLACKLIST),
1661 help='List of regexp to use as blacklist filter when uploading '
1662 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001663 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001664 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001665 if file_path.is_url(options.isolate_server):
1666 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001667 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001668 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001669 except Error as e:
1670 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001671 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001672
1673
1674def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001675 """Download data from the server.
1676
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001677 It can either download individual files or a complete tree from a .isolated
1678 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001679 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001680 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001681 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001682 '-i', '--isolated', metavar='HASH',
1683 help='hash of an isolated file, .isolated file content is discarded, use '
1684 '--file if you need it')
1685 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001686 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1687 help='hash and destination of a file, can be used multiple times')
1688 parser.add_option(
1689 '-t', '--target', metavar='DIR', default=os.getcwd(),
1690 help='destination directory')
1691 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001692 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001693 if args:
1694 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001695 if bool(options.isolated) == bool(options.file):
1696 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001697
1698 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001699
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001700 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001701 if file_path.is_url(remote):
1702 auth.ensure_logged_in(remote)
1703
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001704 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001705 # Fetching individual files.
1706 if options.file:
1707 channel = threading_utils.TaskChannel()
1708 pending = {}
1709 for digest, dest in options.file:
1710 pending[digest] = dest
1711 storage.async_fetch(
1712 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001713 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001714 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001715 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001716 functools.partial(file_write, os.path.join(options.target, dest)))
1717 while pending:
1718 fetched = channel.pull()
1719 dest = pending.pop(fetched)
1720 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001721
Vadim Shtayura3172be52013-12-03 12:49:05 -08001722 # Fetching whole isolated tree.
1723 if options.isolated:
1724 settings = fetch_isolated(
1725 isolated_hash=options.isolated,
1726 storage=storage,
1727 cache=MemoryCache(),
Vadim Shtayura3172be52013-12-03 12:49:05 -08001728 outdir=options.target,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001729 require_command=False)
1730 rel = os.path.join(options.target, settings.relative_cwd)
1731 print('To run this test please run from the directory %s:' %
1732 os.path.join(options.target, rel))
1733 print(' ' + ' '.join(settings.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001734
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001735 return 0
1736
1737
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001738@subcommand.usage('<file1..fileN> or - to read from stdin')
1739def CMDhashtable(parser, args):
1740 """Archives data to a hashtable on the file system.
1741
1742 If a directory is specified, a .isolated file is created the whole directory
1743 is uploaded. Then this .isolated file can be included in another one to run
1744 commands.
1745
1746 The commands output each file that was processed with its content hash. For
1747 directories, the .isolated generated for the directory is listed as the
1748 directory entry itself.
1749 """
1750 add_outdir_options(parser)
1751 parser.add_option(
1752 '--blacklist',
1753 action='append', default=list(DEFAULT_BLACKLIST),
1754 help='List of regexp to use as blacklist filter when uploading '
1755 'directories')
1756 options, files = parser.parse_args(args)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001757 process_outdir_options(parser, options, os.getcwd())
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001758 try:
1759 # Do not compress files when archiving to the file system.
1760 archive(options.outdir, 'default', files, options.blacklist)
1761 except Error as e:
1762 parser.error(e.args[0])
1763 return 0
1764
1765
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001766def add_isolate_server_options(parser, add_indir):
1767 """Adds --isolate-server and --namespace options to parser.
1768
1769 Includes --indir if desired.
1770 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001771 parser.add_option(
1772 '-I', '--isolate-server',
1773 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001774 help='URL of the Isolate Server to use. Defaults to the environment '
1775 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1776 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001777 parser.add_option(
1778 '--namespace', default='default-gzip',
1779 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001780 if add_indir:
1781 parser.add_option(
1782 '--indir', metavar='DIR',
1783 help='Directory used to store the hashtable instead of using an '
1784 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001785
1786
1787def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001788 """Processes the --isolate-server and --indir options and aborts if neither is
1789 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001790 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001791 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001792 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001793 if not has_indir:
1794 parser.error('--isolate-server is required.')
1795 elif not options.indir:
1796 parser.error('Use one of --indir or --isolate-server.')
1797 else:
1798 if has_indir and options.indir:
1799 parser.error('Use only one of --indir or --isolate-server.')
1800
1801 if options.isolate_server:
1802 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001803 if parts.query:
1804 parser.error('--isolate-server doesn\'t support query parameter.')
1805 if parts.fragment:
1806 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001807 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
1808 # what is desired here.
1809 new = list(parts)
1810 if not new[1] and new[2]:
1811 new[1] = new[2].rstrip('/')
1812 new[2] = ''
1813 new[2] = new[2].rstrip('/')
1814 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001815 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001816 return
1817
1818 if file_path.is_url(options.indir):
1819 parser.error('Can\'t use an URL for --indir.')
1820 options.indir = unicode(options.indir).replace('/', os.path.sep)
1821 options.indir = os.path.abspath(
1822 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
1823 if not os.path.isdir(options.indir):
1824 parser.error('Path given to --indir must exist.')
1825
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001826
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001827def add_outdir_options(parser):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001828 """Adds --outdir, which is orthogonal to --isolate-server.
1829
1830 Note: On upload, separate commands are used between 'archive' and 'hashtable'.
1831 On 'download', the same command can download from either an isolate server or
1832 a file system.
1833 """
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001834 parser.add_option(
1835 '-o', '--outdir', metavar='DIR',
1836 help='Directory used to recreate the tree.')
1837
1838
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001839def process_outdir_options(parser, options, cwd):
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001840 if not options.outdir:
1841 parser.error('--outdir is required.')
1842 if file_path.is_url(options.outdir):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001843 parser.error('Can\'t use an URL for --outdir.')
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001844 options.outdir = unicode(options.outdir).replace('/', os.path.sep)
1845 # outdir doesn't need native path case since tracing is never done from there.
1846 options.outdir = os.path.abspath(
1847 os.path.normpath(os.path.join(cwd, options.outdir)))
1848 # In theory, we'd create the directory outdir right away. Defer doing it in
1849 # case there's errors in the command line.
1850
1851
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001852class OptionParserIsolateServer(tools.OptionParserWithLogging):
1853 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001854 tools.OptionParserWithLogging.__init__(
1855 self,
1856 version=__version__,
1857 prog=os.path.basename(sys.modules[__name__].__file__),
1858 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001859 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001860
1861 def parse_args(self, *args, **kwargs):
1862 options, args = tools.OptionParserWithLogging.parse_args(
1863 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001864 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001865 return options, args
1866
1867
1868def main(args):
1869 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001870 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001871
1872
1873if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001874 fix_encoding.fix_encoding()
1875 tools.disable_buffering()
1876 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001877 sys.exit(main(sys.argv[1:]))