blob: 2f800ff9208d428dbd4fa75227c442b8d44e34c6 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000013import re
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050014import shutil
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040015import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000020import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050021import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000029from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040030from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000031from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000032from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080034import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040035import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080036
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000038# Version of isolate protocol passed to the server in /handshake request.
39ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000040
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000041
Vadim Shtayura3148e072014-09-02 18:51:52 -070042# The file size to be used when we don't know the correct file size,
43# generally used for .isolated files.
44UNKNOWN_FILE_SIZE = None
45
46
47# Maximum expected delay (in seconds) between successive file fetches or uploads
48# in Storage. If it takes longer than that, a deadlock might be happening
49# and all stack frames for all threads are dumped to log.
50DEADLOCK_TIMEOUT = 5 * 60
51
52
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000054# All files are sorted by likelihood of a change in the file content
55# (currently file size is used to estimate this: larger the file -> larger the
56# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000057# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000058# and so on. Numbers here is a trade-off; the more per request, the lower the
59# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
60# larger values cause longer lookups, increasing the initial latency to start
61# uploading, which is especially an issue for large files. This value is
62# optimized for the "few thousands files to look up with minimal number of large
63# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040064ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000065
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000066
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000067# A list of already compressed extension types that should not receive any
68# compression before being uploaded.
69ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040070 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
71 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072]
73
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000074
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000075# Chunk size to use when reading from network stream.
76NET_IO_FILE_CHUNK = 16 * 1024
77
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000078
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000079# Read timeout in seconds for downloads from isolate storage. If there's no
80# response from the server within this timeout whole download will be aborted.
81DOWNLOAD_READ_TIMEOUT = 60
82
83
maruel@chromium.org41601642013-09-18 19:40:46 +000084# The delay (in seconds) to wait between logging statements when retrieving
85# the required files. This is intended to let the user (or buildbot) know that
86# the program is still running.
87DELAY_BETWEEN_UPDATES_IN_SECS = 30
88
89
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050090DEFAULT_BLACKLIST = (
91 # Temporary vim or python files.
92 r'^.+\.(?:pyc|swp)$',
93 # .git or .svn directory.
94 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
95)
96
97
98# Chromium-specific.
99DEFAULT_BLACKLIST += (
100 r'^.+\.(?:run_test_cases)$',
101 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
102)
103
104
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500105class Error(Exception):
106 """Generic runtime error."""
107 pass
108
109
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400110class Aborted(Error):
111 """Operation aborted."""
112 pass
113
114
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000115def stream_read(stream, chunk_size):
116 """Reads chunks from |stream| and yields them."""
117 while True:
118 data = stream.read(chunk_size)
119 if not data:
120 break
121 yield data
122
123
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400124def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800125 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800127 if offset:
128 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000130 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 if not data:
132 break
133 yield data
134
135
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136def file_write(filepath, content_generator):
137 """Writes file content as generated by content_generator.
138
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 Creates the intermediary directory as needed.
140
141 Returns the number of bytes written.
142
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 Meant to be mocked out in unit tests.
144 """
145 filedir = os.path.dirname(filepath)
146 if not os.path.isdir(filedir):
147 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000148 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000149 with open(filepath, 'wb') as f:
150 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000152 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000154
155
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000156def zip_compress(content_generator, level=7):
157 """Reads chunks from |content_generator| and yields zip compressed chunks."""
158 compressor = zlib.compressobj(level)
159 for chunk in content_generator:
160 compressed = compressor.compress(chunk)
161 if compressed:
162 yield compressed
163 tail = compressor.flush(zlib.Z_FINISH)
164 if tail:
165 yield tail
166
167
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400168def zip_decompress(
169 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000170 """Reads zipped data from |content_generator| and yields decompressed data.
171
172 Decompresses data in small chunks (no larger than |chunk_size|) so that
173 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
174
175 Raises IOError if data is corrupted or incomplete.
176 """
177 decompressor = zlib.decompressobj()
178 compressed_size = 0
179 try:
180 for chunk in content_generator:
181 compressed_size += len(chunk)
182 data = decompressor.decompress(chunk, chunk_size)
183 if data:
184 yield data
185 while decompressor.unconsumed_tail:
186 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
187 if data:
188 yield data
189 tail = decompressor.flush()
190 if tail:
191 yield tail
192 except zlib.error as e:
193 raise IOError(
194 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
195 # Ensure all data was read and decompressed.
196 if decompressor.unused_data or decompressor.unconsumed_tail:
197 raise IOError('Not all data was decompressed')
198
199
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000200def get_zip_compression_level(filename):
201 """Given a filename calculates the ideal zip compression level to use."""
202 file_ext = os.path.splitext(filename)[1].lower()
203 # TODO(csharp): Profile to find what compression level works best.
204 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
205
206
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000207def create_directories(base_directory, files):
208 """Creates the directory structure needed by the given list of files."""
209 logging.debug('create_directories(%s, %d)', base_directory, len(files))
210 # Creates the tree of directories to create.
211 directories = set(os.path.dirname(f) for f in files)
212 for item in list(directories):
213 while item:
214 directories.add(item)
215 item = os.path.dirname(item)
216 for d in sorted(directories):
217 if d:
218 os.mkdir(os.path.join(base_directory, d))
219
220
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221def create_symlinks(base_directory, files):
222 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000223 for filepath, properties in files:
224 if 'l' not in properties:
225 continue
226 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500227 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000228 logging.warning('Ignoring symlink %s', filepath)
229 continue
230 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500231 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000232 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233
234
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000235def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 """Determines if the given files appears valid.
237
238 Currently it just checks the file's size.
239 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700240 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000241 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000242 actual_size = os.stat(filepath).st_size
243 if size != actual_size:
244 logging.warning(
245 'Found invalid item %s; %d != %d',
246 os.path.basename(filepath), actual_size, size)
247 return False
248 return True
249
250
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251class Item(object):
252 """An item to push to Storage.
253
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800254 Its digest and size may be provided in advance, if known. Otherwise they will
255 be derived from content(). If digest is provided, it MUST correspond to
256 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000257
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800258 When used with Storage, Item starts its life in a main thread, travels
259 to 'contains' thread, then to 'push' thread and then finally back to
260 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """
262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000264 self.digest = digest
265 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800269 def content(self):
270 """Iterable with content of this item as byte string (str) chunks."""
271 raise NotImplementedError()
272
273 def prepare(self, hash_algo):
274 """Ensures self.digest and self.size are set.
275
276 Uses content() as a source of data to calculate them. Does nothing if digest
277 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000278
279 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800280 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000281 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800282 if self.digest is None or self.size is None:
283 digest = hash_algo()
284 total = 0
285 for chunk in self.content():
286 digest.update(chunk)
287 total += len(chunk)
288 self.digest = digest.hexdigest()
289 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000290
291
292class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800293 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000294
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800295 Its digest and size may be provided in advance, if known. Otherwise they will
296 be derived from the file content.
297 """
298
299 def __init__(self, path, digest=None, size=None, high_priority=False):
300 super(FileItem, self).__init__(
301 digest,
302 size if size is not None else os.stat(path).st_size,
303 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000304 self.path = path
305 self.compression_level = get_zip_compression_level(path)
306
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800307 def content(self):
308 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000309
310
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000311class BufferItem(Item):
312 """A byte buffer to push to Storage."""
313
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800314 def __init__(self, buf, high_priority=False):
315 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000316 self.buffer = buf
317
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800318 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319 return [self.buffer]
320
321
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000322class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800323 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000324
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800325 Implements compression support, parallel 'contains' checks, parallel uploads
326 and more.
327
328 Works only within single namespace (and thus hashing algorithm and compression
329 scheme are fixed).
330
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400331 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
332 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800333 """
334
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700335 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400337 self._use_zip = isolated_format.is_namespace_with_compression(
338 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400339 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000340 self._cpu_thread_pool = None
341 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400342 self._aborted = False
343 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000344
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000345 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700346 def hash_algo(self):
347 """Hashing algorithm used to name files in storage based on their content.
348
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400349 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700350 """
351 return self._hash_algo
352
353 @property
354 def location(self):
355 """Location of a backing store that this class is using.
356
357 Exact meaning depends on the storage_api type. For IsolateServer it is
358 an URL of isolate server, for FileSystem is it a path in file system.
359 """
360 return self._storage_api.location
361
362 @property
363 def namespace(self):
364 """Isolate namespace used by this storage.
365
366 Indirectly defines hashing scheme and compression method used.
367 """
368 return self._storage_api.namespace
369
370 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000371 def cpu_thread_pool(self):
372 """ThreadPool for CPU-bound tasks like zipping."""
373 if self._cpu_thread_pool is None:
374 self._cpu_thread_pool = threading_utils.ThreadPool(
375 2, max(threading_utils.num_processors(), 2), 0, 'zip')
376 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000377
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 @property
379 def net_thread_pool(self):
380 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
381 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700382 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000384
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 def close(self):
386 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400387 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 if self._cpu_thread_pool:
389 self._cpu_thread_pool.join()
390 self._cpu_thread_pool.close()
391 self._cpu_thread_pool = None
392 if self._net_thread_pool:
393 self._net_thread_pool.join()
394 self._net_thread_pool.close()
395 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400396 logging.info('Done.')
397
398 def abort(self):
399 """Cancels any pending or future operations."""
400 # This is not strictly theadsafe, but in the worst case the logging message
401 # will be printed twice. Not a big deal. In other places it is assumed that
402 # unprotected reads and writes to _aborted are serializable (it is true
403 # for python) and thus no locking is used.
404 if not self._aborted:
405 logging.warning('Aborting... It can take a while.')
406 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000407
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000408 def __enter__(self):
409 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400410 assert not self._prev_sig_handlers, self._prev_sig_handlers
411 for s in (signal.SIGINT, signal.SIGTERM):
412 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000413 return self
414
415 def __exit__(self, _exc_type, _exc_value, _traceback):
416 """Context manager interface."""
417 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400418 while self._prev_sig_handlers:
419 s, h = self._prev_sig_handlers.popitem()
420 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000421 return False
422
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000423 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800424 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000425
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800426 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000427
428 Arguments:
429 items: list of Item instances that represents data to upload.
430
431 Returns:
432 List of items that were uploaded. All other items are already there.
433 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700434 logging.info('upload_items(items=%d)', len(items))
435
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800436 # Ensure all digests are calculated.
437 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700438 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800439
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000440 # For each digest keep only first Item that matches it. All other items
441 # are just indistinguishable copies from the point of view of isolate
442 # server (it doesn't care about paths at all, only content and digests).
443 seen = {}
444 duplicates = 0
445 for item in items:
446 if seen.setdefault(item.digest, item) is not item:
447 duplicates += 1
448 items = seen.values()
449 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700450 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000453 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000454 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800455 channel = threading_utils.TaskChannel()
456 for missing_item, push_state in self.get_missing_items(items):
457 missing.add(missing_item)
458 self.async_push(channel, missing_item, push_state)
459
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000460 # No need to spawn deadlock detector thread if there's nothing to upload.
461 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700462 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000464 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 detector.ping()
466 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000467 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000468 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000469 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000470 logging.info('All files are uploaded')
471
472 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000473 total = len(items)
474 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000475 logging.info(
476 'Total: %6d, %9.1fkb',
477 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000478 total_size / 1024.)
479 cache_hit = set(items) - missing
480 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 logging.info(
482 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
483 len(cache_hit),
484 cache_hit_size / 1024.,
485 len(cache_hit) * 100. / total,
486 cache_hit_size * 100. / total_size if total_size else 0)
487 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000488 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 logging.info(
490 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
491 len(cache_miss),
492 cache_miss_size / 1024.,
493 len(cache_miss) * 100. / total,
494 cache_miss_size * 100. / total_size if total_size else 0)
495
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496 return uploaded
497
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 def get_fetch_url(self, item):
499 """Returns an URL that can be used to fetch given item once it's uploaded.
500
501 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000502
503 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000505
506 Returns:
507 An URL or None if underlying protocol doesn't support this.
508 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700509 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000511
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000513 """Starts asynchronous push to the server in a parallel thread.
514
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 Can be used only after |item| was checked for presence on a server with
516 'get_missing_items' call. 'get_missing_items' returns |push_state| object
517 that contains storage specific information describing how to upload
518 the item (for example in case of cloud storage, it is signed upload URLs).
519
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000520 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000521 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000522 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800523 push_state: push state returned by 'get_missing_items' call for |item|.
524
525 Returns:
526 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400529 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700530 threading_utils.PRIORITY_HIGH if item.high_priority
531 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800532
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000533 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400534 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400535 if self._aborted:
536 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800538 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 return item
540
541 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700542 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800543 self.net_thread_pool.add_task_with_channel(
544 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000545 return
546
547 # If zipping is enabled, zip in a separate thread.
548 def zip_and_push():
549 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
550 # content right here. It will block until all file is zipped.
551 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400552 if self._aborted:
553 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800554 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 data = ''.join(stream)
556 except Exception as exc:
557 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800558 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000560 self.net_thread_pool.add_task_with_channel(
561 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000563
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800564 def push(self, item, push_state):
565 """Synchronously pushes a single item to the server.
566
567 If you need to push many items at once, consider using 'upload_items' or
568 'async_push' with instance of TaskChannel.
569
570 Arguments:
571 item: item to upload as instance of Item class.
572 push_state: push state returned by 'get_missing_items' call for |item|.
573
574 Returns:
575 Pushed item (same object as |item|).
576 """
577 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700578 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800579 self.async_push(channel, item, push_state)
580 pushed = channel.pull()
581 assert pushed is item
582 return item
583
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000584 def async_fetch(self, channel, priority, digest, size, sink):
585 """Starts asynchronous fetch from the server in a parallel thread.
586
587 Arguments:
588 channel: TaskChannel that receives back |digest| when download ends.
589 priority: thread pool task priority for the fetch.
590 digest: hex digest of an item to download.
591 size: expected size of the item (after decompression).
592 sink: function that will be called as sink(generator).
593 """
594 def fetch():
595 try:
596 # Prepare reading pipeline.
597 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700598 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400599 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000600 # Run |stream| through verifier that will assert its size.
601 verifier = FetchStreamVerifier(stream, size)
602 # Verified stream goes to |sink|.
603 sink(verifier.run())
604 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800605 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000606 raise
607 return digest
608
609 # Don't bother with zip_thread_pool for decompression. Decompression is
610 # really fast and most probably IO bound anyway.
611 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
612
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000613 def get_missing_items(self, items):
614 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000615
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000616 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000617
618 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000619 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
621 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800622 For each missing item it yields a pair (item, push_state), where:
623 * item - Item object that is missing (one of |items|).
624 * push_state - opaque object that contains storage specific information
625 describing how to upload the item (for example in case of cloud
626 storage, it is signed upload URLs). It can later be passed to
627 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000629 channel = threading_utils.TaskChannel()
630 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
632 # Ensure all digests are calculated.
633 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700634 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400636 def contains(batch):
637 if self._aborted:
638 raise Aborted()
639 return self._storage_api.contains(batch)
640
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400643 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400644 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000645 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 # Yield results as they come in.
648 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649 for missing_item, push_state in channel.pull().iteritems():
650 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653def batch_items_for_check(items):
654 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000655
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800656 Each batch corresponds to a single 'exists?' query to the server via a call
657 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000658
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800659 Arguments:
660 items: a list of Item objects.
661
662 Yields:
663 Batches of items to query for existence in a single operation,
664 each batch is a list of Item objects.
665 """
666 batch_count = 0
667 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
668 next_queries = []
669 for item in sorted(items, key=lambda x: x.size, reverse=True):
670 next_queries.append(item)
671 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000672 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800673 next_queries = []
674 batch_count += 1
675 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
676 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
677 if next_queries:
678 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000679
680
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000681class FetchQueue(object):
682 """Fetches items from Storage and places them into LocalCache.
683
684 It manages multiple concurrent fetch operations. Acts as a bridge between
685 Storage and LocalCache so that Storage and LocalCache don't depend on each
686 other at all.
687 """
688
689 def __init__(self, storage, cache):
690 self.storage = storage
691 self.cache = cache
692 self._channel = threading_utils.TaskChannel()
693 self._pending = set()
694 self._accessed = set()
695 self._fetched = cache.cached_set()
696
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400697 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700698 self,
699 digest,
700 size=UNKNOWN_FILE_SIZE,
701 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000702 """Starts asynchronous fetch of item |digest|."""
703 # Fetching it now?
704 if digest in self._pending:
705 return
706
707 # Mark this file as in use, verify_all_cached will later ensure it is still
708 # in cache.
709 self._accessed.add(digest)
710
711 # Already fetched? Notify cache to update item's LRU position.
712 if digest in self._fetched:
713 # 'touch' returns True if item is in cache and not corrupted.
714 if self.cache.touch(digest, size):
715 return
716 # Item is corrupted, remove it from cache and fetch it again.
717 self._fetched.remove(digest)
718 self.cache.evict(digest)
719
720 # TODO(maruel): It should look at the free disk space, the current cache
721 # size and the size of the new item on every new item:
722 # - Trim the cache as more entries are listed when free disk space is low,
723 # otherwise if the amount of data downloaded during the run > free disk
724 # space, it'll crash.
725 # - Make sure there's enough free disk space to fit all dependencies of
726 # this run! If not, abort early.
727
728 # Start fetching.
729 self._pending.add(digest)
730 self.storage.async_fetch(
731 self._channel, priority, digest, size,
732 functools.partial(self.cache.write, digest))
733
734 def wait(self, digests):
735 """Starts a loop that waits for at least one of |digests| to be retrieved.
736
737 Returns the first digest retrieved.
738 """
739 # Flush any already fetched items.
740 for digest in digests:
741 if digest in self._fetched:
742 return digest
743
744 # Ensure all requested items are being fetched now.
745 assert all(digest in self._pending for digest in digests), (
746 digests, self._pending)
747
748 # Wait for some requested item to finish fetching.
749 while self._pending:
750 digest = self._channel.pull()
751 self._pending.remove(digest)
752 self._fetched.add(digest)
753 if digest in digests:
754 return digest
755
756 # Should never reach this point due to assert above.
757 raise RuntimeError('Impossible state')
758
759 def inject_local_file(self, path, algo):
760 """Adds local file to the cache as if it was fetched from storage."""
761 with open(path, 'rb') as f:
762 data = f.read()
763 digest = algo(data).hexdigest()
764 self.cache.write(digest, [data])
765 self._fetched.add(digest)
766 return digest
767
768 @property
769 def pending_count(self):
770 """Returns number of items to be fetched."""
771 return len(self._pending)
772
773 def verify_all_cached(self):
774 """True if all accessed items are in cache."""
775 return self._accessed.issubset(self.cache.cached_set())
776
777
778class FetchStreamVerifier(object):
779 """Verifies that fetched file is valid before passing it to the LocalCache."""
780
781 def __init__(self, stream, expected_size):
782 self.stream = stream
783 self.expected_size = expected_size
784 self.current_size = 0
785
786 def run(self):
787 """Generator that yields same items as |stream|.
788
789 Verifies |stream| is complete before yielding a last chunk to consumer.
790
791 Also wraps IOError produced by consumer into MappingError exceptions since
792 otherwise Storage will retry fetch on unrelated local cache errors.
793 """
794 # Read one chunk ahead, keep it in |stored|.
795 # That way a complete stream can be verified before pushing last chunk
796 # to consumer.
797 stored = None
798 for chunk in self.stream:
799 assert chunk is not None
800 if stored is not None:
801 self._inspect_chunk(stored, is_last=False)
802 try:
803 yield stored
804 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400805 raise isolated_format.MappingError(
806 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000807 stored = chunk
808 if stored is not None:
809 self._inspect_chunk(stored, is_last=True)
810 try:
811 yield stored
812 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400813 raise isolated_format.MappingError(
814 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000815
816 def _inspect_chunk(self, chunk, is_last):
817 """Called for each fetched chunk before passing it to consumer."""
818 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400819 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700820 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000821 (self.expected_size != self.current_size)):
822 raise IOError('Incorrect file size: expected %d, got %d' % (
823 self.expected_size, self.current_size))
824
825
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000826class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800827 """Interface for classes that implement low-level storage operations.
828
829 StorageApi is oblivious of compression and hashing scheme used. This details
830 are handled in higher level Storage class.
831
832 Clients should generally not use StorageApi directly. Storage class is
833 preferred since it implements compression and upload optimizations.
834 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000835
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700836 @property
837 def location(self):
838 """Location of a backing store that this class is using.
839
840 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
841 server, for FileSystem is it a path in file system.
842 """
843 raise NotImplementedError()
844
845 @property
846 def namespace(self):
847 """Isolate namespace used by this storage.
848
849 Indirectly defines hashing scheme and compression method used.
850 """
851 raise NotImplementedError()
852
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000853 def get_fetch_url(self, digest):
854 """Returns an URL that can be used to fetch an item with given digest.
855
856 Arguments:
857 digest: hex digest of item to fetch.
858
859 Returns:
860 An URL or None if the protocol doesn't support this.
861 """
862 raise NotImplementedError()
863
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800864 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000865 """Fetches an object and yields its content.
866
867 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000868 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800869 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000870
871 Yields:
872 Chunks of downloaded item (as str objects).
873 """
874 raise NotImplementedError()
875
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800876 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000877 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000878
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800879 |item| MUST go through 'contains' call to get |push_state| before it can
880 be pushed to the storage.
881
882 To be clear, here is one possible usage:
883 all_items = [... all items to push as Item subclasses ...]
884 for missing_item, push_state in storage_api.contains(all_items).items():
885 storage_api.push(missing_item, push_state)
886
887 When pushing to a namespace with compression, data that should be pushed
888 and data provided by the item is not the same. In that case |content| is
889 not None and it yields chunks of compressed data (using item.content() as
890 a source of original uncompressed data). This is implemented by Storage
891 class.
892
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000893 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000894 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800895 push_state: push state object as returned by 'contains' call.
896 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000897
898 Returns:
899 None.
900 """
901 raise NotImplementedError()
902
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000903 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800904 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000905
906 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800907 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000908
909 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800910 A dict missing Item -> opaque push state object to be passed to 'push'.
911 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000912 """
913 raise NotImplementedError()
914
915
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800916class _IsolateServerPushState(object):
917 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500918
919 Note this needs to be a global class to support pickling.
920 """
921
922 def __init__(self, upload_url, finalize_url):
923 self.upload_url = upload_url
924 self.finalize_url = finalize_url
925 self.uploaded = False
926 self.finalized = False
927
928
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000929class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000930 """StorageApi implementation that downloads and uploads to Isolate Server.
931
932 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800933 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000934 """
935
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000936 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000937 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000938 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700939 self._base_url = base_url.rstrip('/')
940 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000941 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000942 self._server_caps = None
943
944 @staticmethod
945 def _generate_handshake_request():
946 """Returns a dict to be sent as handshake request body."""
947 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
948 return {
949 'client_app_version': __version__,
950 'fetcher': True,
951 'protocol_version': ISOLATE_PROTOCOL_VERSION,
952 'pusher': True,
953 }
954
955 @staticmethod
956 def _validate_handshake_response(caps):
957 """Validates and normalizes handshake response."""
958 logging.info('Protocol version: %s', caps['protocol_version'])
959 logging.info('Server version: %s', caps['server_app_version'])
960 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400961 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000962 if not caps['access_token']:
963 raise ValueError('access_token is missing')
964 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000965
966 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000967 def _server_capabilities(self):
968 """Performs handshake with the server if not yet done.
969
970 Returns:
971 Server capabilities dictionary as returned by /handshake endpoint.
972
973 Raises:
974 MappingError if server rejects the handshake.
975 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000976 # TODO(maruel): Make this request much earlier asynchronously while the
977 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800978
979 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
980 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000981 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000982 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000983 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400984 caps = net.url_read_json(
985 url=self._base_url + '/content-gs/handshake',
986 data=self._generate_handshake_request())
987 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400988 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000989 if not isinstance(caps, dict):
990 raise ValueError('Expecting JSON dict')
991 self._server_caps = self._validate_handshake_response(caps)
992 except (ValueError, KeyError, TypeError) as exc:
993 # KeyError exception has very confusing str conversion: it's just a
994 # missing key value and nothing else. So print exception class name
995 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400996 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400997 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000998 exc.__class__.__name__, exc))
999 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001000
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001001 @property
1002 def location(self):
1003 return self._base_url
1004
1005 @property
1006 def namespace(self):
1007 return self._namespace
1008
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001009 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001010 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001011 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001012 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001013
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001014 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001015 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001016 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001017
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001018 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001019 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001020 read_timeout=DOWNLOAD_READ_TIMEOUT,
1021 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1022
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001023 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001024 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001025
1026 # If |offset| is used, verify server respects it by checking Content-Range.
1027 if offset:
1028 content_range = connection.get_header('Content-Range')
1029 if not content_range:
1030 raise IOError('Missing Content-Range header')
1031
1032 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1033 # According to a spec, <size> can be '*' meaning "Total size of the file
1034 # is not known in advance".
1035 try:
1036 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1037 if not match:
1038 raise ValueError()
1039 content_offset = int(match.group(1))
1040 last_byte_index = int(match.group(2))
1041 size = None if match.group(3) == '*' else int(match.group(3))
1042 except ValueError:
1043 raise IOError('Invalid Content-Range header: %s' % content_range)
1044
1045 # Ensure returned offset equals requested one.
1046 if offset != content_offset:
1047 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1048 offset, content_offset, content_range))
1049
1050 # Ensure entire tail of the file is returned.
1051 if size is not None and last_byte_index + 1 != size:
1052 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1053
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001054 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001055
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001056 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001057 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001058 assert item.digest is not None
1059 assert item.size is not None
1060 assert isinstance(push_state, _IsolateServerPushState)
1061 assert not push_state.finalized
1062
1063 # Default to item.content().
1064 content = item.content() if content is None else content
1065
1066 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1067 if isinstance(content, basestring):
1068 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1069 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001070
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001071 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1072 # If |content| is indeed a generator, it can not be re-winded back
1073 # to the beginning of the stream. A retry will find it exhausted. A possible
1074 # solution is to wrap |content| generator with some sort of caching
1075 # restartable generator. It should be done alongside streaming support
1076 # implementation.
1077
1078 # This push operation may be a retry after failed finalization call below,
1079 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001080 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001081 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1082 # upload support is implemented.
1083 if isinstance(content, list) and len(content) == 1:
1084 content = content[0]
1085 else:
1086 content = ''.join(content)
1087 # PUT file to |upload_url|.
1088 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001089 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001090 data=content,
1091 content_type='application/octet-stream',
1092 method='PUT')
1093 if response is None:
1094 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001095 item.digest, push_state.upload_url))
1096 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001097 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001098 logging.info(
1099 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001100
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001101 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001102 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001103 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1104 # send it to isolated server. That way isolate server can verify that
1105 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1106 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001107 # TODO(maruel): Fix the server to accept propery data={} so
1108 # url_read_json() can be used.
1109 response = net.url_read(
1110 url=push_state.finalize_url,
1111 data='',
1112 content_type='application/json',
1113 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001114 if response is None:
1115 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001116 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001117
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001118 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001119 # Ensure all items were initialized with 'prepare' call. Storage does that.
1120 assert all(i.digest is not None and i.size is not None for i in items)
1121
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001122 # Request body is a json encoded list of dicts.
1123 body = [
1124 {
1125 'h': item.digest,
1126 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001127 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001128 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001129 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001130
1131 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001132 self._base_url,
1133 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001134 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001135
1136 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001137 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001138 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001139 response = net.url_read_json(url=query_url, data=body)
1140 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001141 raise isolated_format.MappingError(
1142 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001143 if not isinstance(response, list):
1144 raise ValueError('Expecting response with json-encoded list')
1145 if len(response) != len(items):
1146 raise ValueError(
1147 'Incorrect number of items in the list, expected %d, '
1148 'but got %d' % (len(items), len(response)))
1149 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001150 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001151 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001152
1153 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001154 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001155 for i, push_urls in enumerate(response):
1156 if push_urls:
1157 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001158 missing_items[items[i]] = _IsolateServerPushState(
1159 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001160 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001161 len(items), len(items) - len(missing_items))
1162 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001163
1164
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001165class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001166 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001167
1168 The common use case is a NFS/CIFS file server that is mounted locally that is
1169 used to fetch the file on a local partition.
1170 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001171
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001172 # Used for push_state instead of None. That way caller is forced to
1173 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1174 _DUMMY_PUSH_STATE = object()
1175
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001176 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001177 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001178 self._base_path = base_path
1179 self._namespace = namespace
1180
1181 @property
1182 def location(self):
1183 return self._base_path
1184
1185 @property
1186 def namespace(self):
1187 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001188
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001189 def get_fetch_url(self, digest):
1190 return None
1191
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001192 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001193 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001194 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001195
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001196 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001197 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001198 assert item.digest is not None
1199 assert item.size is not None
1200 assert push_state is self._DUMMY_PUSH_STATE
1201 content = item.content() if content is None else content
1202 if isinstance(content, basestring):
1203 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1204 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001205 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001206
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001207 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001208 assert all(i.digest is not None and i.size is not None for i in items)
1209 return dict(
1210 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001211 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001212 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001213
1214
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001215class LocalCache(object):
1216 """Local cache that stores objects fetched via Storage.
1217
1218 It can be accessed concurrently from multiple threads, so it should protect
1219 its internal state with some lock.
1220 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001221 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001222
1223 def __enter__(self):
1224 """Context manager interface."""
1225 return self
1226
1227 def __exit__(self, _exc_type, _exec_value, _traceback):
1228 """Context manager interface."""
1229 return False
1230
1231 def cached_set(self):
1232 """Returns a set of all cached digests (always a new object)."""
1233 raise NotImplementedError()
1234
1235 def touch(self, digest, size):
1236 """Ensures item is not corrupted and updates its LRU position.
1237
1238 Arguments:
1239 digest: hash digest of item to check.
1240 size: expected size of this item.
1241
1242 Returns:
1243 True if item is in cache and not corrupted.
1244 """
1245 raise NotImplementedError()
1246
1247 def evict(self, digest):
1248 """Removes item from cache if it's there."""
1249 raise NotImplementedError()
1250
1251 def read(self, digest):
1252 """Returns contents of the cached item as a single str."""
1253 raise NotImplementedError()
1254
1255 def write(self, digest, content):
1256 """Reads data from |content| generator and stores it in cache."""
1257 raise NotImplementedError()
1258
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001259 def hardlink(self, digest, dest, file_mode):
1260 """Ensures file at |dest| has same content as cached |digest|.
1261
1262 If file_mode is provided, it is used to set the executable bit if
1263 applicable.
1264 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001265 raise NotImplementedError()
1266
1267
1268class MemoryCache(LocalCache):
1269 """LocalCache implementation that stores everything in memory."""
1270
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001271 def __init__(self, file_mode_mask=0500):
1272 """Args:
1273 file_mode_mask: bit mask to AND file mode with. Default value will make
1274 all mapped files to be read only.
1275 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001276 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001277 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001278 # Let's not assume dict is thread safe.
1279 self._lock = threading.Lock()
1280 self._contents = {}
1281
1282 def cached_set(self):
1283 with self._lock:
1284 return set(self._contents)
1285
1286 def touch(self, digest, size):
1287 with self._lock:
1288 return digest in self._contents
1289
1290 def evict(self, digest):
1291 with self._lock:
1292 self._contents.pop(digest, None)
1293
1294 def read(self, digest):
1295 with self._lock:
1296 return self._contents[digest]
1297
1298 def write(self, digest, content):
1299 # Assemble whole stream before taking the lock.
1300 data = ''.join(content)
1301 with self._lock:
1302 self._contents[digest] = data
1303
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001304 def hardlink(self, digest, dest, file_mode):
1305 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001306 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001307 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001308 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001309
1310
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001311class IsolatedBundle(object):
1312 """Fetched and parsed .isolated file with all dependencies."""
1313
Vadim Shtayura3148e072014-09-02 18:51:52 -07001314 def __init__(self):
1315 self.command = []
1316 self.files = {}
1317 self.read_only = None
1318 self.relative_cwd = None
1319 # The main .isolated file, a IsolatedFile instance.
1320 self.root = None
1321
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001322 def fetch(self, fetch_queue, root_isolated_hash, algo):
1323 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001324
1325 It enables support for "included" .isolated files. They are processed in
1326 strict order but fetched asynchronously from the cache. This is important so
1327 that a file in an included .isolated file that is overridden by an embedding
1328 .isolated file is not fetched needlessly. The includes are fetched in one
1329 pass and the files are fetched as soon as all the ones on the left-side
1330 of the tree were fetched.
1331
1332 The prioritization is very important here for nested .isolated files.
1333 'includes' have the highest priority and the algorithm is optimized for both
1334 deep and wide trees. A deep one is a long link of .isolated files referenced
1335 one at a time by one item in 'includes'. A wide one has a large number of
1336 'includes' in a single .isolated file. 'left' is defined as an included
1337 .isolated file earlier in the 'includes' list. So the order of the elements
1338 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001339
1340 As a side effect this method starts asynchronous fetch of all data files
1341 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1342 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001343 """
1344 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1345
1346 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1347 pending = {}
1348 # Set of hashes of already retrieved items to refuse recursive includes.
1349 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001350 # Set of IsolatedFile's whose data files have already being fetched.
1351 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001352
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001353 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001354 h = isolated_file.obj_hash
1355 if h in seen:
1356 raise isolated_format.IsolatedError(
1357 'IsolatedFile %s is retrieved recursively' % h)
1358 assert h not in pending
1359 seen.add(h)
1360 pending[h] = isolated_file
1361 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1362
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001363 # Start fetching root *.isolated file (single file, not the whole bundle).
1364 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001365
1366 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001367 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001368 item_hash = fetch_queue.wait(pending)
1369 item = pending.pop(item_hash)
1370 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001371
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001372 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001373 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001374 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001375
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001376 # Always fetch *.isolated files in traversal order, waiting if necessary
1377 # until next to-be-processed node loads. "Waiting" is done by yielding
1378 # back to the outer loop, that waits until some *.isolated is loaded.
1379 for node in isolated_format.walk_includes(self.root):
1380 if node not in processed:
1381 # Not visited, and not yet loaded -> wait for it to load.
1382 if not node.is_loaded:
1383 break
1384 # Not visited and loaded -> process it and continue the traversal.
1385 self._start_fetching_files(node, fetch_queue)
1386 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001387
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001388 # All *.isolated files should be processed by now and only them.
1389 all_isolateds = set(isolated_format.walk_includes(self.root))
1390 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001391
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001392 # Extract 'command' and other bundle properties.
1393 for node in isolated_format.walk_includes(self.root):
1394 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001395 self.relative_cwd = self.relative_cwd or ''
1396
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001397 def _start_fetching_files(self, isolated, fetch_queue):
1398 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001399
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001400 Modifies self.files.
1401 """
1402 logging.debug('fetch_files(%s)', isolated.obj_hash)
1403 for filepath, properties in isolated.data.get('files', {}).iteritems():
1404 # Root isolated has priority on the files being mapped. In particular,
1405 # overridden files must not be fetched.
1406 if filepath not in self.files:
1407 self.files[filepath] = properties
1408 if 'h' in properties:
1409 # Preemptively request files.
1410 logging.debug('fetching %s', filepath)
1411 fetch_queue.add(
1412 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1413
1414 def _update_self(self, node):
1415 """Extracts bundle global parameters from loaded *.isolated file.
1416
1417 Will be called with each loaded *.isolated file in order of traversal of
1418 isolated include graph (see isolated_format.walk_includes).
1419 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001420 # Grabs properties.
1421 if not self.command and node.data.get('command'):
1422 # Ensure paths are correctly separated on windows.
1423 self.command = node.data['command']
1424 if self.command:
1425 self.command[0] = self.command[0].replace('/', os.path.sep)
1426 self.command = tools.fix_python_path(self.command)
1427 if self.read_only is None and node.data.get('read_only') is not None:
1428 self.read_only = node.data['read_only']
1429 if (self.relative_cwd is None and
1430 node.data.get('relative_cwd') is not None):
1431 self.relative_cwd = node.data['relative_cwd']
1432
1433
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001434def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001435 """Returns an object that implements low-level StorageApi interface.
1436
1437 It is used by Storage to work with single isolate |namespace|. It should
1438 rarely be used directly by clients, see 'get_storage' for
1439 a better alternative.
1440
1441 Arguments:
1442 file_or_url: a file path to use file system based storage, or URL of isolate
1443 service to use shared cloud based storage.
1444 namespace: isolate namespace to operate in, also defines hashing and
1445 compression scheme used, i.e. namespace names that end with '-gzip'
1446 store compressed data.
1447
1448 Returns:
1449 Instance of StorageApi subclass.
1450 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001451 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001452 return IsolateServer(file_or_url, namespace)
1453 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001454 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001455
1456
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001457def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001458 """Returns Storage class that can upload and download from |namespace|.
1459
1460 Arguments:
1461 file_or_url: a file path to use file system based storage, or URL of isolate
1462 service to use shared cloud based storage.
1463 namespace: isolate namespace to operate in, also defines hashing and
1464 compression scheme used, i.e. namespace names that end with '-gzip'
1465 store compressed data.
1466
1467 Returns:
1468 Instance of Storage.
1469 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001470 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001471
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001472
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001473def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001474 """Uploads the given tree to the given url.
1475
1476 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001477 base_url: The url of the isolate server to upload to.
1478 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001479 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001480 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001481 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001482 # Filter out symlinks, since they are not represented by items on isolate
1483 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001484 items = []
1485 seen = set()
1486 skipped = 0
1487 for filepath, metadata in infiles:
1488 if 'l' not in metadata and filepath not in seen:
1489 seen.add(filepath)
1490 item = FileItem(
1491 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001492 digest=metadata['h'],
1493 size=metadata['s'],
1494 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001495 items.append(item)
1496 else:
1497 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001498
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001499 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001500 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001501 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001502
1503
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001504def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001505 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001506
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001507 Arguments:
1508 isolated_hash: hash of the root *.isolated file.
1509 storage: Storage class that communicates with isolate storage.
1510 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001511 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001512 require_command: Ensure *.isolated specifies a command to run.
1513
1514 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001515 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001516 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001517 logging.debug(
1518 'fetch_isolated(%s, %s, %s, %s, %s)',
1519 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001520 # Hash algorithm to use, defined by namespace |storage| is using.
1521 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001522 with cache:
1523 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001524 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001525
1526 with tools.Profiler('GetIsolateds'):
1527 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001528 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001529 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1530 try:
1531 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1532 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001533 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001534 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1535 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001536
1537 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001538 bundle.fetch(fetch_queue, isolated_hash, algo)
1539 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001540 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1541 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001542 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001543
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001544 with tools.Profiler('GetRest'):
1545 # Create file system hierarchy.
1546 if not os.path.isdir(outdir):
1547 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001548 create_directories(outdir, bundle.files)
1549 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001550
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001551 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001552 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001553 if not os.path.isdir(cwd):
1554 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001555
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001556 # Multimap: digest -> list of pairs (path, props).
1557 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001558 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001559 if 'h' in props:
1560 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001561
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001562 # Now block on the remaining files to be downloaded and mapped.
1563 logging.info('Retrieving remaining files (%d of them)...',
1564 fetch_queue.pending_count)
1565 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001566 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001567 while remaining:
1568 detector.ping()
1569
1570 # Wait for any item to finish fetching to cache.
1571 digest = fetch_queue.wait(remaining)
1572
1573 # Link corresponding files to a fetched item in cache.
1574 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001575 cache.hardlink(
1576 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001577
1578 # Report progress.
1579 duration = time.time() - last_update
1580 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1581 msg = '%d files remaining...' % len(remaining)
1582 print msg
1583 logging.info(msg)
1584 last_update = time.time()
1585
1586 # Cache could evict some items we just tried to fetch, it's a fatal error.
1587 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001588 raise isolated_format.MappingError(
1589 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001590 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001591
1592
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001593def directory_to_metadata(root, algo, blacklist):
1594 """Returns the FileItem list and .isolated metadata for a directory."""
1595 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001596 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001597 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001598 metadata = {
1599 relpath: isolated_format.file_to_metadata(
1600 os.path.join(root, relpath), {}, False, algo)
1601 for relpath in paths
1602 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001603 for v in metadata.itervalues():
1604 v.pop('t')
1605 items = [
1606 FileItem(
1607 path=os.path.join(root, relpath),
1608 digest=meta['h'],
1609 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001610 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001611 for relpath, meta in metadata.iteritems() if 'h' in meta
1612 ]
1613 return items, metadata
1614
1615
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001616def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001617 """Stores every entries and returns the relevant data.
1618
1619 Arguments:
1620 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001621 files: list of file paths to upload. If a directory is specified, a
1622 .isolated file is created and its hash is returned.
1623 blacklist: function that returns True if a file should be omitted.
1624 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001625 assert all(isinstance(i, unicode) for i in files), files
1626 if len(files) != len(set(map(os.path.abspath, files))):
1627 raise Error('Duplicate entries found.')
1628
1629 results = []
1630 # The temporary directory is only created as needed.
1631 tempdir = None
1632 try:
1633 # TODO(maruel): Yield the files to a worker thread.
1634 items_to_upload = []
1635 for f in files:
1636 try:
1637 filepath = os.path.abspath(f)
1638 if os.path.isdir(filepath):
1639 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001640 items, metadata = directory_to_metadata(
1641 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001642
1643 # Create the .isolated file.
1644 if not tempdir:
1645 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1646 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1647 os.close(handle)
1648 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001649 'algo':
1650 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001651 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001652 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001653 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001654 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001655 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001656 items_to_upload.extend(items)
1657 items_to_upload.append(
1658 FileItem(
1659 path=isolated,
1660 digest=h,
1661 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001662 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001663 results.append((h, f))
1664
1665 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001666 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001667 items_to_upload.append(
1668 FileItem(
1669 path=filepath,
1670 digest=h,
1671 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001672 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001673 results.append((h, f))
1674 else:
1675 raise Error('%s is neither a file or directory.' % f)
1676 except OSError:
1677 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001678 # Technically we would care about which files were uploaded but we don't
1679 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001680 _uploaded_files = storage.upload_items(items_to_upload)
1681 return results
1682 finally:
1683 if tempdir:
1684 shutil.rmtree(tempdir)
1685
1686
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001687def archive(out, namespace, files, blacklist):
1688 if files == ['-']:
1689 files = sys.stdin.readlines()
1690
1691 if not files:
1692 raise Error('Nothing to upload')
1693
1694 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001695 blacklist = tools.gen_blacklist(blacklist)
1696 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001697 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001698 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1699
1700
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001701@subcommand.usage('<file1..fileN> or - to read from stdin')
1702def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001703 """Archives data to the server.
1704
1705 If a directory is specified, a .isolated file is created the whole directory
1706 is uploaded. Then this .isolated file can be included in another one to run
1707 commands.
1708
1709 The commands output each file that was processed with its content hash. For
1710 directories, the .isolated generated for the directory is listed as the
1711 directory entry itself.
1712 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001713 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001714 parser.add_option(
1715 '--blacklist',
1716 action='append', default=list(DEFAULT_BLACKLIST),
1717 help='List of regexp to use as blacklist filter when uploading '
1718 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001719 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001720 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001721 if file_path.is_url(options.isolate_server):
1722 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001723 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001724 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001725 except Error as e:
1726 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001727 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001728
1729
1730def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001731 """Download data from the server.
1732
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001733 It can either download individual files or a complete tree from a .isolated
1734 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001735 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001736 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001737 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001738 '-i', '--isolated', metavar='HASH',
1739 help='hash of an isolated file, .isolated file content is discarded, use '
1740 '--file if you need it')
1741 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001742 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1743 help='hash and destination of a file, can be used multiple times')
1744 parser.add_option(
1745 '-t', '--target', metavar='DIR', default=os.getcwd(),
1746 help='destination directory')
1747 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001748 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001749 if args:
1750 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001751 if bool(options.isolated) == bool(options.file):
1752 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001753
1754 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001755
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001756 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001757 if file_path.is_url(remote):
1758 auth.ensure_logged_in(remote)
1759
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001760 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001761 # Fetching individual files.
1762 if options.file:
1763 channel = threading_utils.TaskChannel()
1764 pending = {}
1765 for digest, dest in options.file:
1766 pending[digest] = dest
1767 storage.async_fetch(
1768 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001769 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001770 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001771 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001772 functools.partial(file_write, os.path.join(options.target, dest)))
1773 while pending:
1774 fetched = channel.pull()
1775 dest = pending.pop(fetched)
1776 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001777
Vadim Shtayura3172be52013-12-03 12:49:05 -08001778 # Fetching whole isolated tree.
1779 if options.isolated:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001780 bundle = fetch_isolated(
Vadim Shtayura3172be52013-12-03 12:49:05 -08001781 isolated_hash=options.isolated,
1782 storage=storage,
1783 cache=MemoryCache(),
Vadim Shtayura3172be52013-12-03 12:49:05 -08001784 outdir=options.target,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001785 require_command=False)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001786 rel = os.path.join(options.target, bundle.relative_cwd)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001787 print('To run this test please run from the directory %s:' %
1788 os.path.join(options.target, rel))
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001789 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001790
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001791 return 0
1792
1793
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001794def add_isolate_server_options(parser, add_indir):
1795 """Adds --isolate-server and --namespace options to parser.
1796
1797 Includes --indir if desired.
1798 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001799 parser.add_option(
1800 '-I', '--isolate-server',
1801 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001802 help='URL of the Isolate Server to use. Defaults to the environment '
1803 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1804 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001805 parser.add_option(
1806 '--namespace', default='default-gzip',
1807 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001808 if add_indir:
1809 parser.add_option(
1810 '--indir', metavar='DIR',
1811 help='Directory used to store the hashtable instead of using an '
1812 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001813
1814
1815def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001816 """Processes the --isolate-server and --indir options and aborts if neither is
1817 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001818 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001819 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001820 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001821 if not has_indir:
1822 parser.error('--isolate-server is required.')
1823 elif not options.indir:
1824 parser.error('Use one of --indir or --isolate-server.')
1825 else:
1826 if has_indir and options.indir:
1827 parser.error('Use only one of --indir or --isolate-server.')
1828
1829 if options.isolate_server:
1830 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001831 if parts.query:
1832 parser.error('--isolate-server doesn\'t support query parameter.')
1833 if parts.fragment:
1834 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001835 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
1836 # what is desired here.
1837 new = list(parts)
1838 if not new[1] and new[2]:
1839 new[1] = new[2].rstrip('/')
1840 new[2] = ''
1841 new[2] = new[2].rstrip('/')
1842 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001843 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001844 return
1845
1846 if file_path.is_url(options.indir):
1847 parser.error('Can\'t use an URL for --indir.')
1848 options.indir = unicode(options.indir).replace('/', os.path.sep)
1849 options.indir = os.path.abspath(
1850 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
1851 if not os.path.isdir(options.indir):
1852 parser.error('Path given to --indir must exist.')
1853
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001854
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001855class OptionParserIsolateServer(tools.OptionParserWithLogging):
1856 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001857 tools.OptionParserWithLogging.__init__(
1858 self,
1859 version=__version__,
1860 prog=os.path.basename(sys.modules[__name__].__file__),
1861 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001862 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001863
1864 def parse_args(self, *args, **kwargs):
1865 options, args = tools.OptionParserWithLogging.parse_args(
1866 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001867 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001868 return options, args
1869
1870
1871def main(args):
1872 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001873 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001874
1875
1876if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001877 fix_encoding.fix_encoding()
1878 tools.disable_buffering()
1879 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001880 sys.exit(main(sys.argv[1:]))