blob: 074422ee44576289fdb2b24a625e2507724b4572 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040012import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040015import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000020import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050021import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040029from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000030from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040031from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040036import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# Version of isolate protocol passed to the server in /handshake request.
40ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000042
Vadim Shtayura3148e072014-09-02 18:51:52 -070043# The file size to be used when we don't know the correct file size,
44# generally used for .isolated files.
45UNKNOWN_FILE_SIZE = None
46
47
48# Maximum expected delay (in seconds) between successive file fetches or uploads
49# in Storage. If it takes longer than that, a deadlock might be happening
50# and all stack frames for all threads are dumped to log.
51DEADLOCK_TIMEOUT = 5 * 60
52
53
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000054# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000055# All files are sorted by likelihood of a change in the file content
56# (currently file size is used to estimate this: larger the file -> larger the
57# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# and so on. Numbers here is a trade-off; the more per request, the lower the
60# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
61# larger values cause longer lookups, increasing the initial latency to start
62# uploading, which is especially an issue for large files. This value is
63# optimized for the "few thousands files to look up with minimal number of large
64# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040065ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000066
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000067
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000068# A list of already compressed extension types that should not receive any
69# compression before being uploaded.
70ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040071 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
72 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000073]
74
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000075
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000076# Chunk size to use when reading from network stream.
77NET_IO_FILE_CHUNK = 16 * 1024
78
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000079
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000080# Read timeout in seconds for downloads from isolate storage. If there's no
81# response from the server within this timeout whole download will be aborted.
82DOWNLOAD_READ_TIMEOUT = 60
83
84
maruel@chromium.org41601642013-09-18 19:40:46 +000085# The delay (in seconds) to wait between logging statements when retrieving
86# the required files. This is intended to let the user (or buildbot) know that
87# the program is still running.
88DELAY_BETWEEN_UPDATES_IN_SECS = 30
89
90
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050091DEFAULT_BLACKLIST = (
92 # Temporary vim or python files.
93 r'^.+\.(?:pyc|swp)$',
94 # .git or .svn directory.
95 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
96)
97
98
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050099class Error(Exception):
100 """Generic runtime error."""
101 pass
102
103
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400104class Aborted(Error):
105 """Operation aborted."""
106 pass
107
108
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000109def stream_read(stream, chunk_size):
110 """Reads chunks from |stream| and yields them."""
111 while True:
112 data = stream.read(chunk_size)
113 if not data:
114 break
115 yield data
116
117
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400118def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800119 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000120 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800121 if offset:
122 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000123 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000124 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 if not data:
126 break
127 yield data
128
129
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000130def file_write(filepath, content_generator):
131 """Writes file content as generated by content_generator.
132
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 Creates the intermediary directory as needed.
134
135 Returns the number of bytes written.
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 Meant to be mocked out in unit tests.
138 """
139 filedir = os.path.dirname(filepath)
140 if not os.path.isdir(filedir):
141 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 with open(filepath, 'wb') as f:
144 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000145 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148
149
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000150def zip_compress(content_generator, level=7):
151 """Reads chunks from |content_generator| and yields zip compressed chunks."""
152 compressor = zlib.compressobj(level)
153 for chunk in content_generator:
154 compressed = compressor.compress(chunk)
155 if compressed:
156 yield compressed
157 tail = compressor.flush(zlib.Z_FINISH)
158 if tail:
159 yield tail
160
161
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400162def zip_decompress(
163 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000164 """Reads zipped data from |content_generator| and yields decompressed data.
165
166 Decompresses data in small chunks (no larger than |chunk_size|) so that
167 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
168
169 Raises IOError if data is corrupted or incomplete.
170 """
171 decompressor = zlib.decompressobj()
172 compressed_size = 0
173 try:
174 for chunk in content_generator:
175 compressed_size += len(chunk)
176 data = decompressor.decompress(chunk, chunk_size)
177 if data:
178 yield data
179 while decompressor.unconsumed_tail:
180 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
181 if data:
182 yield data
183 tail = decompressor.flush()
184 if tail:
185 yield tail
186 except zlib.error as e:
187 raise IOError(
188 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
189 # Ensure all data was read and decompressed.
190 if decompressor.unused_data or decompressor.unconsumed_tail:
191 raise IOError('Not all data was decompressed')
192
193
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000194def get_zip_compression_level(filename):
195 """Given a filename calculates the ideal zip compression level to use."""
196 file_ext = os.path.splitext(filename)[1].lower()
197 # TODO(csharp): Profile to find what compression level works best.
198 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
199
200
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000201def create_directories(base_directory, files):
202 """Creates the directory structure needed by the given list of files."""
203 logging.debug('create_directories(%s, %d)', base_directory, len(files))
204 # Creates the tree of directories to create.
205 directories = set(os.path.dirname(f) for f in files)
206 for item in list(directories):
207 while item:
208 directories.add(item)
209 item = os.path.dirname(item)
210 for d in sorted(directories):
211 if d:
212 os.mkdir(os.path.join(base_directory, d))
213
214
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500215def create_symlinks(base_directory, files):
216 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000217 for filepath, properties in files:
218 if 'l' not in properties:
219 continue
220 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222 logging.warning('Ignoring symlink %s', filepath)
223 continue
224 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500225 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000226 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000227
228
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000229def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000230 """Determines if the given files appears valid.
231
232 Currently it just checks the file's size.
233 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700234 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000235 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 actual_size = os.stat(filepath).st_size
237 if size != actual_size:
238 logging.warning(
239 'Found invalid item %s; %d != %d',
240 os.path.basename(filepath), actual_size, size)
241 return False
242 return True
243
244
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000245class Item(object):
246 """An item to push to Storage.
247
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800248 Its digest and size may be provided in advance, if known. Otherwise they will
249 be derived from content(). If digest is provided, it MUST correspond to
250 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800252 When used with Storage, Item starts its life in a main thread, travels
253 to 'contains' thread, then to 'push' thread and then finally back to
254 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000255 """
256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000258 self.digest = digest
259 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800260 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def content(self):
264 """Iterable with content of this item as byte string (str) chunks."""
265 raise NotImplementedError()
266
267 def prepare(self, hash_algo):
268 """Ensures self.digest and self.size are set.
269
270 Uses content() as a source of data to calculate them. Does nothing if digest
271 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000272
273 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800274 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000275 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800276 if self.digest is None or self.size is None:
277 digest = hash_algo()
278 total = 0
279 for chunk in self.content():
280 digest.update(chunk)
281 total += len(chunk)
282 self.digest = digest.hexdigest()
283 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000284
285
286class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800287 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000288
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800289 Its digest and size may be provided in advance, if known. Otherwise they will
290 be derived from the file content.
291 """
292
293 def __init__(self, path, digest=None, size=None, high_priority=False):
294 super(FileItem, self).__init__(
295 digest,
296 size if size is not None else os.stat(path).st_size,
297 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000298 self.path = path
299 self.compression_level = get_zip_compression_level(path)
300
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800301 def content(self):
302 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000303
304
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000305class BufferItem(Item):
306 """A byte buffer to push to Storage."""
307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 def __init__(self, buf, high_priority=False):
309 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000310 self.buffer = buf
311
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800312 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000313 return [self.buffer]
314
315
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000316class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 Implements compression support, parallel 'contains' checks, parallel uploads
320 and more.
321
322 Works only within single namespace (and thus hashing algorithm and compression
323 scheme are fixed).
324
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400325 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
326 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800327 """
328
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700329 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000330 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400331 self._use_zip = isolated_format.is_namespace_with_compression(
332 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400333 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000334 self._cpu_thread_pool = None
335 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400336 self._aborted = False
337 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000338
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000339 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700340 def hash_algo(self):
341 """Hashing algorithm used to name files in storage based on their content.
342
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400343 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700344 """
345 return self._hash_algo
346
347 @property
348 def location(self):
349 """Location of a backing store that this class is using.
350
351 Exact meaning depends on the storage_api type. For IsolateServer it is
352 an URL of isolate server, for FileSystem is it a path in file system.
353 """
354 return self._storage_api.location
355
356 @property
357 def namespace(self):
358 """Isolate namespace used by this storage.
359
360 Indirectly defines hashing scheme and compression method used.
361 """
362 return self._storage_api.namespace
363
364 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000365 def cpu_thread_pool(self):
366 """ThreadPool for CPU-bound tasks like zipping."""
367 if self._cpu_thread_pool is None:
368 self._cpu_thread_pool = threading_utils.ThreadPool(
369 2, max(threading_utils.num_processors(), 2), 0, 'zip')
370 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000371
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000372 @property
373 def net_thread_pool(self):
374 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
375 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700376 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000378
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000379 def close(self):
380 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400381 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000382 if self._cpu_thread_pool:
383 self._cpu_thread_pool.join()
384 self._cpu_thread_pool.close()
385 self._cpu_thread_pool = None
386 if self._net_thread_pool:
387 self._net_thread_pool.join()
388 self._net_thread_pool.close()
389 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400390 logging.info('Done.')
391
392 def abort(self):
393 """Cancels any pending or future operations."""
394 # This is not strictly theadsafe, but in the worst case the logging message
395 # will be printed twice. Not a big deal. In other places it is assumed that
396 # unprotected reads and writes to _aborted are serializable (it is true
397 # for python) and thus no locking is used.
398 if not self._aborted:
399 logging.warning('Aborting... It can take a while.')
400 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000401
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000402 def __enter__(self):
403 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400404 assert not self._prev_sig_handlers, self._prev_sig_handlers
405 for s in (signal.SIGINT, signal.SIGTERM):
406 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000407 return self
408
409 def __exit__(self, _exc_type, _exc_value, _traceback):
410 """Context manager interface."""
411 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400412 while self._prev_sig_handlers:
413 s, h = self._prev_sig_handlers.popitem()
414 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 return False
416
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000417 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800418 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000419
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800420 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000421
422 Arguments:
423 items: list of Item instances that represents data to upload.
424
425 Returns:
426 List of items that were uploaded. All other items are already there.
427 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700428 logging.info('upload_items(items=%d)', len(items))
429
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800430 # Ensure all digests are calculated.
431 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700432 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800433
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000434 # For each digest keep only first Item that matches it. All other items
435 # are just indistinguishable copies from the point of view of isolate
436 # server (it doesn't care about paths at all, only content and digests).
437 seen = {}
438 duplicates = 0
439 for item in items:
440 if seen.setdefault(item.digest, item) is not item:
441 duplicates += 1
442 items = seen.values()
443 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700444 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000445
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000446 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000447 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000448 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800449 channel = threading_utils.TaskChannel()
450 for missing_item, push_state in self.get_missing_items(items):
451 missing.add(missing_item)
452 self.async_push(channel, missing_item, push_state)
453
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000454 # No need to spawn deadlock detector thread if there's nothing to upload.
455 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000458 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 detector.ping()
460 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000461 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000462 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000463 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 logging.info('All files are uploaded')
465
466 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000467 total = len(items)
468 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000469 logging.info(
470 'Total: %6d, %9.1fkb',
471 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000472 total_size / 1024.)
473 cache_hit = set(items) - missing
474 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000475 logging.info(
476 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
477 len(cache_hit),
478 cache_hit_size / 1024.,
479 len(cache_hit) * 100. / total,
480 cache_hit_size * 100. / total_size if total_size else 0)
481 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000482 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000483 logging.info(
484 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
485 len(cache_miss),
486 cache_miss_size / 1024.,
487 len(cache_miss) * 100. / total,
488 cache_miss_size * 100. / total_size if total_size else 0)
489
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000490 return uploaded
491
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800492 def get_fetch_url(self, item):
493 """Returns an URL that can be used to fetch given item once it's uploaded.
494
495 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496
497 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
500 Returns:
501 An URL or None if underlying protocol doesn't support this.
502 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700503 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000505
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800506 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000507 """Starts asynchronous push to the server in a parallel thread.
508
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 Can be used only after |item| was checked for presence on a server with
510 'get_missing_items' call. 'get_missing_items' returns |push_state| object
511 that contains storage specific information describing how to upload
512 the item (for example in case of cloud storage, it is signed upload URLs).
513
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000514 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000515 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000516 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800517 push_state: push state returned by 'get_missing_items' call for |item|.
518
519 Returns:
520 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000521 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800522 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400523 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700524 threading_utils.PRIORITY_HIGH if item.high_priority
525 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800526
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000527 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400528 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400529 if self._aborted:
530 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700531 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800532 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 return item
534
535 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700536 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800537 self.net_thread_pool.add_task_with_channel(
538 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 return
540
541 # If zipping is enabled, zip in a separate thread.
542 def zip_and_push():
543 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
544 # content right here. It will block until all file is zipped.
545 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400546 if self._aborted:
547 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800548 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 data = ''.join(stream)
550 except Exception as exc:
551 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800552 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000553 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000554 self.net_thread_pool.add_task_with_channel(
555 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000556 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000557
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800558 def push(self, item, push_state):
559 """Synchronously pushes a single item to the server.
560
561 If you need to push many items at once, consider using 'upload_items' or
562 'async_push' with instance of TaskChannel.
563
564 Arguments:
565 item: item to upload as instance of Item class.
566 push_state: push state returned by 'get_missing_items' call for |item|.
567
568 Returns:
569 Pushed item (same object as |item|).
570 """
571 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700572 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800573 self.async_push(channel, item, push_state)
574 pushed = channel.pull()
575 assert pushed is item
576 return item
577
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000578 def async_fetch(self, channel, priority, digest, size, sink):
579 """Starts asynchronous fetch from the server in a parallel thread.
580
581 Arguments:
582 channel: TaskChannel that receives back |digest| when download ends.
583 priority: thread pool task priority for the fetch.
584 digest: hex digest of an item to download.
585 size: expected size of the item (after decompression).
586 sink: function that will be called as sink(generator).
587 """
588 def fetch():
589 try:
590 # Prepare reading pipeline.
591 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700592 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400593 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000594 # Run |stream| through verifier that will assert its size.
595 verifier = FetchStreamVerifier(stream, size)
596 # Verified stream goes to |sink|.
597 sink(verifier.run())
598 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800599 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000600 raise
601 return digest
602
603 # Don't bother with zip_thread_pool for decompression. Decompression is
604 # really fast and most probably IO bound anyway.
605 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
606
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000607 def get_missing_items(self, items):
608 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000609
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000610 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000611
612 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000613 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000614
615 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616 For each missing item it yields a pair (item, push_state), where:
617 * item - Item object that is missing (one of |items|).
618 * push_state - opaque object that contains storage specific information
619 describing how to upload the item (for example in case of cloud
620 storage, it is signed upload URLs). It can later be passed to
621 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000622 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000623 channel = threading_utils.TaskChannel()
624 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800625
626 # Ensure all digests are calculated.
627 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700628 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800629
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400630 def contains(batch):
631 if self._aborted:
632 raise Aborted()
633 return self._storage_api.contains(batch)
634
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000635 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400637 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400638 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000639 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800640
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 # Yield results as they come in.
642 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800643 for missing_item, push_state in channel.pull().iteritems():
644 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000645
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000646
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800647def batch_items_for_check(items):
648 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000649
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800650 Each batch corresponds to a single 'exists?' query to the server via a call
651 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653 Arguments:
654 items: a list of Item objects.
655
656 Yields:
657 Batches of items to query for existence in a single operation,
658 each batch is a list of Item objects.
659 """
660 batch_count = 0
661 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
662 next_queries = []
663 for item in sorted(items, key=lambda x: x.size, reverse=True):
664 next_queries.append(item)
665 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000666 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800667 next_queries = []
668 batch_count += 1
669 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
670 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
671 if next_queries:
672 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000673
674
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000675class FetchQueue(object):
676 """Fetches items from Storage and places them into LocalCache.
677
678 It manages multiple concurrent fetch operations. Acts as a bridge between
679 Storage and LocalCache so that Storage and LocalCache don't depend on each
680 other at all.
681 """
682
683 def __init__(self, storage, cache):
684 self.storage = storage
685 self.cache = cache
686 self._channel = threading_utils.TaskChannel()
687 self._pending = set()
688 self._accessed = set()
689 self._fetched = cache.cached_set()
690
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400691 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700692 self,
693 digest,
694 size=UNKNOWN_FILE_SIZE,
695 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000696 """Starts asynchronous fetch of item |digest|."""
697 # Fetching it now?
698 if digest in self._pending:
699 return
700
701 # Mark this file as in use, verify_all_cached will later ensure it is still
702 # in cache.
703 self._accessed.add(digest)
704
705 # Already fetched? Notify cache to update item's LRU position.
706 if digest in self._fetched:
707 # 'touch' returns True if item is in cache and not corrupted.
708 if self.cache.touch(digest, size):
709 return
710 # Item is corrupted, remove it from cache and fetch it again.
711 self._fetched.remove(digest)
712 self.cache.evict(digest)
713
714 # TODO(maruel): It should look at the free disk space, the current cache
715 # size and the size of the new item on every new item:
716 # - Trim the cache as more entries are listed when free disk space is low,
717 # otherwise if the amount of data downloaded during the run > free disk
718 # space, it'll crash.
719 # - Make sure there's enough free disk space to fit all dependencies of
720 # this run! If not, abort early.
721
722 # Start fetching.
723 self._pending.add(digest)
724 self.storage.async_fetch(
725 self._channel, priority, digest, size,
726 functools.partial(self.cache.write, digest))
727
728 def wait(self, digests):
729 """Starts a loop that waits for at least one of |digests| to be retrieved.
730
731 Returns the first digest retrieved.
732 """
733 # Flush any already fetched items.
734 for digest in digests:
735 if digest in self._fetched:
736 return digest
737
738 # Ensure all requested items are being fetched now.
739 assert all(digest in self._pending for digest in digests), (
740 digests, self._pending)
741
742 # Wait for some requested item to finish fetching.
743 while self._pending:
744 digest = self._channel.pull()
745 self._pending.remove(digest)
746 self._fetched.add(digest)
747 if digest in digests:
748 return digest
749
750 # Should never reach this point due to assert above.
751 raise RuntimeError('Impossible state')
752
753 def inject_local_file(self, path, algo):
754 """Adds local file to the cache as if it was fetched from storage."""
755 with open(path, 'rb') as f:
756 data = f.read()
757 digest = algo(data).hexdigest()
758 self.cache.write(digest, [data])
759 self._fetched.add(digest)
760 return digest
761
762 @property
763 def pending_count(self):
764 """Returns number of items to be fetched."""
765 return len(self._pending)
766
767 def verify_all_cached(self):
768 """True if all accessed items are in cache."""
769 return self._accessed.issubset(self.cache.cached_set())
770
771
772class FetchStreamVerifier(object):
773 """Verifies that fetched file is valid before passing it to the LocalCache."""
774
775 def __init__(self, stream, expected_size):
776 self.stream = stream
777 self.expected_size = expected_size
778 self.current_size = 0
779
780 def run(self):
781 """Generator that yields same items as |stream|.
782
783 Verifies |stream| is complete before yielding a last chunk to consumer.
784
785 Also wraps IOError produced by consumer into MappingError exceptions since
786 otherwise Storage will retry fetch on unrelated local cache errors.
787 """
788 # Read one chunk ahead, keep it in |stored|.
789 # That way a complete stream can be verified before pushing last chunk
790 # to consumer.
791 stored = None
792 for chunk in self.stream:
793 assert chunk is not None
794 if stored is not None:
795 self._inspect_chunk(stored, is_last=False)
796 try:
797 yield stored
798 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400799 raise isolated_format.MappingError(
800 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000801 stored = chunk
802 if stored is not None:
803 self._inspect_chunk(stored, is_last=True)
804 try:
805 yield stored
806 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400807 raise isolated_format.MappingError(
808 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000809
810 def _inspect_chunk(self, chunk, is_last):
811 """Called for each fetched chunk before passing it to consumer."""
812 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400813 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700814 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000815 (self.expected_size != self.current_size)):
816 raise IOError('Incorrect file size: expected %d, got %d' % (
817 self.expected_size, self.current_size))
818
819
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000820class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800821 """Interface for classes that implement low-level storage operations.
822
823 StorageApi is oblivious of compression and hashing scheme used. This details
824 are handled in higher level Storage class.
825
826 Clients should generally not use StorageApi directly. Storage class is
827 preferred since it implements compression and upload optimizations.
828 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000829
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700830 @property
831 def location(self):
832 """Location of a backing store that this class is using.
833
834 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
835 server, for FileSystem is it a path in file system.
836 """
837 raise NotImplementedError()
838
839 @property
840 def namespace(self):
841 """Isolate namespace used by this storage.
842
843 Indirectly defines hashing scheme and compression method used.
844 """
845 raise NotImplementedError()
846
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000847 def get_fetch_url(self, digest):
848 """Returns an URL that can be used to fetch an item with given digest.
849
850 Arguments:
851 digest: hex digest of item to fetch.
852
853 Returns:
854 An URL or None if the protocol doesn't support this.
855 """
856 raise NotImplementedError()
857
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800858 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000859 """Fetches an object and yields its content.
860
861 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000862 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800863 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000864
865 Yields:
866 Chunks of downloaded item (as str objects).
867 """
868 raise NotImplementedError()
869
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800870 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000871 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000872
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800873 |item| MUST go through 'contains' call to get |push_state| before it can
874 be pushed to the storage.
875
876 To be clear, here is one possible usage:
877 all_items = [... all items to push as Item subclasses ...]
878 for missing_item, push_state in storage_api.contains(all_items).items():
879 storage_api.push(missing_item, push_state)
880
881 When pushing to a namespace with compression, data that should be pushed
882 and data provided by the item is not the same. In that case |content| is
883 not None and it yields chunks of compressed data (using item.content() as
884 a source of original uncompressed data). This is implemented by Storage
885 class.
886
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000887 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000888 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800889 push_state: push state object as returned by 'contains' call.
890 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000891
892 Returns:
893 None.
894 """
895 raise NotImplementedError()
896
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000897 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800898 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000899
900 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800901 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000902
903 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800904 A dict missing Item -> opaque push state object to be passed to 'push'.
905 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000906 """
907 raise NotImplementedError()
908
909
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800910class _IsolateServerPushState(object):
911 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500912
913 Note this needs to be a global class to support pickling.
914 """
915
916 def __init__(self, upload_url, finalize_url):
917 self.upload_url = upload_url
918 self.finalize_url = finalize_url
919 self.uploaded = False
920 self.finalized = False
921
922
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000923class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000924 """StorageApi implementation that downloads and uploads to Isolate Server.
925
926 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800927 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000928 """
929
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000930 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000931 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000932 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700933 self._base_url = base_url.rstrip('/')
934 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000935 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000936 self._server_caps = None
937
938 @staticmethod
939 def _generate_handshake_request():
940 """Returns a dict to be sent as handshake request body."""
941 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
942 return {
943 'client_app_version': __version__,
944 'fetcher': True,
945 'protocol_version': ISOLATE_PROTOCOL_VERSION,
946 'pusher': True,
947 }
948
949 @staticmethod
950 def _validate_handshake_response(caps):
951 """Validates and normalizes handshake response."""
952 logging.info('Protocol version: %s', caps['protocol_version'])
953 logging.info('Server version: %s', caps['server_app_version'])
954 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400955 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000956 if not caps['access_token']:
957 raise ValueError('access_token is missing')
958 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000959
960 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000961 def _server_capabilities(self):
962 """Performs handshake with the server if not yet done.
963
964 Returns:
965 Server capabilities dictionary as returned by /handshake endpoint.
966
967 Raises:
968 MappingError if server rejects the handshake.
969 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000970 # TODO(maruel): Make this request much earlier asynchronously while the
971 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800972
973 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
974 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000975 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000976 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000977 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400978 caps = net.url_read_json(
979 url=self._base_url + '/content-gs/handshake',
980 data=self._generate_handshake_request())
981 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400982 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000983 if not isinstance(caps, dict):
984 raise ValueError('Expecting JSON dict')
985 self._server_caps = self._validate_handshake_response(caps)
986 except (ValueError, KeyError, TypeError) as exc:
987 # KeyError exception has very confusing str conversion: it's just a
988 # missing key value and nothing else. So print exception class name
989 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400990 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400991 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000992 exc.__class__.__name__, exc))
993 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000994
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700995 @property
996 def location(self):
997 return self._base_url
998
999 @property
1000 def namespace(self):
1001 return self._namespace
1002
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001003 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001004 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001005 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001006 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001007
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001008 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001009 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001010 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001011
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001012 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001013 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001014 read_timeout=DOWNLOAD_READ_TIMEOUT,
1015 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1016
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001017 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001018 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001019
1020 # If |offset| is used, verify server respects it by checking Content-Range.
1021 if offset:
1022 content_range = connection.get_header('Content-Range')
1023 if not content_range:
1024 raise IOError('Missing Content-Range header')
1025
1026 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1027 # According to a spec, <size> can be '*' meaning "Total size of the file
1028 # is not known in advance".
1029 try:
1030 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1031 if not match:
1032 raise ValueError()
1033 content_offset = int(match.group(1))
1034 last_byte_index = int(match.group(2))
1035 size = None if match.group(3) == '*' else int(match.group(3))
1036 except ValueError:
1037 raise IOError('Invalid Content-Range header: %s' % content_range)
1038
1039 # Ensure returned offset equals requested one.
1040 if offset != content_offset:
1041 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1042 offset, content_offset, content_range))
1043
1044 # Ensure entire tail of the file is returned.
1045 if size is not None and last_byte_index + 1 != size:
1046 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1047
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001048 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001049
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001050 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001051 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001052 assert item.digest is not None
1053 assert item.size is not None
1054 assert isinstance(push_state, _IsolateServerPushState)
1055 assert not push_state.finalized
1056
1057 # Default to item.content().
1058 content = item.content() if content is None else content
1059
1060 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1061 if isinstance(content, basestring):
1062 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1063 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001064
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001065 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1066 # If |content| is indeed a generator, it can not be re-winded back
1067 # to the beginning of the stream. A retry will find it exhausted. A possible
1068 # solution is to wrap |content| generator with some sort of caching
1069 # restartable generator. It should be done alongside streaming support
1070 # implementation.
1071
1072 # This push operation may be a retry after failed finalization call below,
1073 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001074 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001075 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1076 # upload support is implemented.
1077 if isinstance(content, list) and len(content) == 1:
1078 content = content[0]
1079 else:
1080 content = ''.join(content)
1081 # PUT file to |upload_url|.
1082 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001083 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001084 data=content,
1085 content_type='application/octet-stream',
1086 method='PUT')
1087 if response is None:
1088 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001089 item.digest, push_state.upload_url))
1090 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001091 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001092 logging.info(
1093 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001094
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001095 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001096 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001097 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1098 # send it to isolated server. That way isolate server can verify that
1099 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1100 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001101 # TODO(maruel): Fix the server to accept propery data={} so
1102 # url_read_json() can be used.
1103 response = net.url_read(
1104 url=push_state.finalize_url,
1105 data='',
1106 content_type='application/json',
1107 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001108 if response is None:
1109 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001110 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001111
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001112 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001113 # Ensure all items were initialized with 'prepare' call. Storage does that.
1114 assert all(i.digest is not None and i.size is not None for i in items)
1115
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001116 # Request body is a json encoded list of dicts.
1117 body = [
1118 {
1119 'h': item.digest,
1120 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001121 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001122 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001123 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001124
1125 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001126 self._base_url,
1127 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001128 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129
1130 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001131 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001132 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001133 response = net.url_read_json(url=query_url, data=body)
1134 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001135 raise isolated_format.MappingError(
1136 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001137 if not isinstance(response, list):
1138 raise ValueError('Expecting response with json-encoded list')
1139 if len(response) != len(items):
1140 raise ValueError(
1141 'Incorrect number of items in the list, expected %d, '
1142 'but got %d' % (len(items), len(response)))
1143 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001144 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001145 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001146
1147 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001148 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001149 for i, push_urls in enumerate(response):
1150 if push_urls:
1151 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001152 missing_items[items[i]] = _IsolateServerPushState(
1153 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001154 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001155 len(items), len(items) - len(missing_items))
1156 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001157
1158
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001159class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001160 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001161
1162 The common use case is a NFS/CIFS file server that is mounted locally that is
1163 used to fetch the file on a local partition.
1164 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001165
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001166 # Used for push_state instead of None. That way caller is forced to
1167 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1168 _DUMMY_PUSH_STATE = object()
1169
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001170 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001171 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001172 self._base_path = base_path
1173 self._namespace = namespace
1174
1175 @property
1176 def location(self):
1177 return self._base_path
1178
1179 @property
1180 def namespace(self):
1181 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001182
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001183 def get_fetch_url(self, digest):
1184 return None
1185
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001186 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001187 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001188 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001189
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001190 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001191 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001192 assert item.digest is not None
1193 assert item.size is not None
1194 assert push_state is self._DUMMY_PUSH_STATE
1195 content = item.content() if content is None else content
1196 if isinstance(content, basestring):
1197 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1198 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001199 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001200
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001201 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001202 assert all(i.digest is not None and i.size is not None for i in items)
1203 return dict(
1204 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001205 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001206 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001207
1208
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001209class LocalCache(object):
1210 """Local cache that stores objects fetched via Storage.
1211
1212 It can be accessed concurrently from multiple threads, so it should protect
1213 its internal state with some lock.
1214 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001215 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001216
1217 def __enter__(self):
1218 """Context manager interface."""
1219 return self
1220
1221 def __exit__(self, _exc_type, _exec_value, _traceback):
1222 """Context manager interface."""
1223 return False
1224
1225 def cached_set(self):
1226 """Returns a set of all cached digests (always a new object)."""
1227 raise NotImplementedError()
1228
1229 def touch(self, digest, size):
1230 """Ensures item is not corrupted and updates its LRU position.
1231
1232 Arguments:
1233 digest: hash digest of item to check.
1234 size: expected size of this item.
1235
1236 Returns:
1237 True if item is in cache and not corrupted.
1238 """
1239 raise NotImplementedError()
1240
1241 def evict(self, digest):
1242 """Removes item from cache if it's there."""
1243 raise NotImplementedError()
1244
1245 def read(self, digest):
1246 """Returns contents of the cached item as a single str."""
1247 raise NotImplementedError()
1248
1249 def write(self, digest, content):
1250 """Reads data from |content| generator and stores it in cache."""
1251 raise NotImplementedError()
1252
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001253 def hardlink(self, digest, dest, file_mode):
1254 """Ensures file at |dest| has same content as cached |digest|.
1255
1256 If file_mode is provided, it is used to set the executable bit if
1257 applicable.
1258 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001259 raise NotImplementedError()
1260
1261
1262class MemoryCache(LocalCache):
1263 """LocalCache implementation that stores everything in memory."""
1264
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001265 def __init__(self, file_mode_mask=0500):
1266 """Args:
1267 file_mode_mask: bit mask to AND file mode with. Default value will make
1268 all mapped files to be read only.
1269 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001270 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001271 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001272 # Let's not assume dict is thread safe.
1273 self._lock = threading.Lock()
1274 self._contents = {}
1275
1276 def cached_set(self):
1277 with self._lock:
1278 return set(self._contents)
1279
1280 def touch(self, digest, size):
1281 with self._lock:
1282 return digest in self._contents
1283
1284 def evict(self, digest):
1285 with self._lock:
1286 self._contents.pop(digest, None)
1287
1288 def read(self, digest):
1289 with self._lock:
1290 return self._contents[digest]
1291
1292 def write(self, digest, content):
1293 # Assemble whole stream before taking the lock.
1294 data = ''.join(content)
1295 with self._lock:
1296 self._contents[digest] = data
1297
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001298 def hardlink(self, digest, dest, file_mode):
1299 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001300 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001301 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001302 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001303
1304
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001305class CachePolicies(object):
1306 def __init__(self, max_cache_size, min_free_space, max_items):
1307 """
1308 Arguments:
1309 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1310 cache is effectively a leak.
1311 - min_free_space: Trim if disk free space becomes lower than this value. If
1312 0, it unconditionally fill the disk.
1313 - max_items: Maximum number of items to keep in the cache. If 0, do not
1314 enforce a limit.
1315 """
1316 self.max_cache_size = max_cache_size
1317 self.min_free_space = min_free_space
1318 self.max_items = max_items
1319
1320
1321class DiskCache(LocalCache):
1322 """Stateful LRU cache in a flat hash table in a directory.
1323
1324 Saves its state as json file.
1325 """
1326 STATE_FILE = 'state.json'
1327
1328 def __init__(self, cache_dir, policies, hash_algo):
1329 """
1330 Arguments:
1331 cache_dir: directory where to place the cache.
1332 policies: cache retention policies.
1333 algo: hashing algorithm used.
1334 """
1335 super(DiskCache, self).__init__()
1336 self.cache_dir = cache_dir
1337 self.policies = policies
1338 self.hash_algo = hash_algo
1339 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1340
1341 # All protected methods (starting with '_') except _path should be called
1342 # with this lock locked.
1343 self._lock = threading_utils.LockWithAssert()
1344 self._lru = lru.LRUDict()
1345
1346 # Profiling values.
1347 self._added = []
1348 self._removed = []
1349 self._free_disk = 0
1350
1351 with tools.Profiler('Setup'):
1352 with self._lock:
1353 self._load()
1354
1355 def __enter__(self):
1356 return self
1357
1358 def __exit__(self, _exc_type, _exec_value, _traceback):
1359 with tools.Profiler('CleanupTrimming'):
1360 with self._lock:
1361 self._trim()
1362
1363 logging.info(
1364 '%5d (%8dkb) added',
1365 len(self._added), sum(self._added) / 1024)
1366 logging.info(
1367 '%5d (%8dkb) current',
1368 len(self._lru),
1369 sum(self._lru.itervalues()) / 1024)
1370 logging.info(
1371 '%5d (%8dkb) removed',
1372 len(self._removed), sum(self._removed) / 1024)
1373 logging.info(
1374 ' %8dkb free',
1375 self._free_disk / 1024)
1376 return False
1377
1378 def cached_set(self):
1379 with self._lock:
1380 return self._lru.keys_set()
1381
1382 def touch(self, digest, size):
1383 """Verifies an actual file is valid.
1384
1385 Note that is doesn't compute the hash so it could still be corrupted if the
1386 file size didn't change.
1387
1388 TODO(maruel): More stringent verification while keeping the check fast.
1389 """
1390 # Do the check outside the lock.
1391 if not is_valid_file(self._path(digest), size):
1392 return False
1393
1394 # Update it's LRU position.
1395 with self._lock:
1396 if digest not in self._lru:
1397 return False
1398 self._lru.touch(digest)
1399 return True
1400
1401 def evict(self, digest):
1402 with self._lock:
1403 self._lru.pop(digest)
1404 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1405
1406 def read(self, digest):
1407 with open(self._path(digest), 'rb') as f:
1408 return f.read()
1409
1410 def write(self, digest, content):
1411 path = self._path(digest)
1412 # A stale broken file may remain. It is possible for the file to have write
1413 # access bit removed which would cause the file_write() call to fail to open
1414 # in write mode. Take no chance here.
1415 file_path.try_remove(path)
1416 try:
1417 size = file_write(path, content)
1418 except:
1419 # There are two possible places were an exception can occur:
1420 # 1) Inside |content| generator in case of network or unzipping errors.
1421 # 2) Inside file_write itself in case of disk IO errors.
1422 # In any case delete an incomplete file and propagate the exception to
1423 # caller, it will be logged there.
1424 file_path.try_remove(path)
1425 raise
1426 # Make the file read-only in the cache. This has a few side-effects since
1427 # the file node is modified, so every directory entries to this file becomes
1428 # read-only. It's fine here because it is a new file.
1429 file_path.set_read_only(path, True)
1430 with self._lock:
1431 self._add(digest, size)
1432
1433 def hardlink(self, digest, dest, file_mode):
1434 """Hardlinks the file to |dest|.
1435
1436 Note that the file permission bits are on the file node, not the directory
1437 entry, so changing the access bit on any of the directory entries for the
1438 file node will affect them all.
1439 """
1440 path = self._path(digest)
1441 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1442 file_path.hardlink(path, dest)
1443 if file_mode is not None:
1444 # Ignores all other bits.
1445 os.chmod(dest, file_mode & 0500)
1446
1447 def _load(self):
1448 """Loads state of the cache from json file."""
1449 self._lock.assert_locked()
1450
1451 if not os.path.isdir(self.cache_dir):
1452 os.makedirs(self.cache_dir)
1453 else:
1454 # Make sure the cache is read-only.
1455 # TODO(maruel): Calculate the cost and optimize the performance
1456 # accordingly.
1457 file_path.make_tree_read_only(self.cache_dir)
1458
1459 # Load state of the cache.
1460 if os.path.isfile(self.state_file):
1461 try:
1462 self._lru = lru.LRUDict.load(self.state_file)
1463 except ValueError as err:
1464 logging.error('Failed to load cache state: %s' % (err,))
1465 # Don't want to keep broken state file.
1466 file_path.try_remove(self.state_file)
1467
1468 # Ensure that all files listed in the state still exist and add new ones.
1469 previous = self._lru.keys_set()
1470 unknown = []
1471 for filename in os.listdir(self.cache_dir):
1472 if filename == self.STATE_FILE:
1473 continue
1474 if filename in previous:
1475 previous.remove(filename)
1476 continue
1477 # An untracked file.
1478 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1479 logging.warning('Removing unknown file %s from cache', filename)
1480 file_path.try_remove(self._path(filename))
1481 continue
1482 # File that's not referenced in 'state.json'.
1483 # TODO(vadimsh): Verify its SHA1 matches file name.
1484 logging.warning('Adding unknown file %s to cache', filename)
1485 unknown.append(filename)
1486
1487 if unknown:
1488 # Add as oldest files. They will be deleted eventually if not accessed.
1489 self._add_oldest_list(unknown)
1490 logging.warning('Added back %d unknown files', len(unknown))
1491
1492 if previous:
1493 # Filter out entries that were not found.
1494 logging.warning('Removed %d lost files', len(previous))
1495 for filename in previous:
1496 self._lru.pop(filename)
1497 self._trim()
1498
1499 def _save(self):
1500 """Saves the LRU ordering."""
1501 self._lock.assert_locked()
1502 if sys.platform != 'win32':
1503 d = os.path.dirname(self.state_file)
1504 if os.path.isdir(d):
1505 # Necessary otherwise the file can't be created.
1506 file_path.set_read_only(d, False)
1507 if os.path.isfile(self.state_file):
1508 file_path.set_read_only(self.state_file, False)
1509 self._lru.save(self.state_file)
1510
1511 def _trim(self):
1512 """Trims anything we don't know, make sure enough free space exists."""
1513 self._lock.assert_locked()
1514
1515 # Ensure maximum cache size.
1516 if self.policies.max_cache_size:
1517 total_size = sum(self._lru.itervalues())
1518 while total_size > self.policies.max_cache_size:
1519 total_size -= self._remove_lru_file()
1520
1521 # Ensure maximum number of items in the cache.
1522 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1523 for _ in xrange(len(self._lru) - self.policies.max_items):
1524 self._remove_lru_file()
1525
1526 # Ensure enough free space.
1527 self._free_disk = file_path.get_free_space(self.cache_dir)
1528 trimmed_due_to_space = False
1529 while (
1530 self.policies.min_free_space and
1531 self._lru and
1532 self._free_disk < self.policies.min_free_space):
1533 trimmed_due_to_space = True
1534 self._remove_lru_file()
1535 self._free_disk = file_path.get_free_space(self.cache_dir)
1536 if trimmed_due_to_space:
1537 total_usage = sum(self._lru.itervalues())
1538 usage_percent = 0.
1539 if total_usage:
1540 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1541 logging.warning(
1542 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1543 'cache (%.1f%% of its maximum capacity)',
1544 self._free_disk / 1024.,
1545 total_usage / 1024.,
1546 usage_percent)
1547 self._save()
1548
1549 def _path(self, digest):
1550 """Returns the path to one item."""
1551 return os.path.join(self.cache_dir, digest)
1552
1553 def _remove_lru_file(self):
1554 """Removes the last recently used file and returns its size."""
1555 self._lock.assert_locked()
1556 digest, size = self._lru.pop_oldest()
1557 self._delete_file(digest, size)
1558 return size
1559
1560 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1561 """Adds an item into LRU cache marking it as a newest one."""
1562 self._lock.assert_locked()
1563 if size == UNKNOWN_FILE_SIZE:
1564 size = os.stat(self._path(digest)).st_size
1565 self._added.append(size)
1566 self._lru.add(digest, size)
1567
1568 def _add_oldest_list(self, digests):
1569 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1570 self._lock.assert_locked()
1571 pairs = []
1572 for digest in digests:
1573 size = os.stat(self._path(digest)).st_size
1574 self._added.append(size)
1575 pairs.append((digest, size))
1576 self._lru.batch_insert_oldest(pairs)
1577
1578 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1579 """Deletes cache file from the file system."""
1580 self._lock.assert_locked()
1581 try:
1582 if size == UNKNOWN_FILE_SIZE:
1583 size = os.stat(self._path(digest)).st_size
1584 file_path.try_remove(self._path(digest))
1585 self._removed.append(size)
1586 except OSError as e:
1587 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1588
1589
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001590class IsolatedBundle(object):
1591 """Fetched and parsed .isolated file with all dependencies."""
1592
Vadim Shtayura3148e072014-09-02 18:51:52 -07001593 def __init__(self):
1594 self.command = []
1595 self.files = {}
1596 self.read_only = None
1597 self.relative_cwd = None
1598 # The main .isolated file, a IsolatedFile instance.
1599 self.root = None
1600
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001601 def fetch(self, fetch_queue, root_isolated_hash, algo):
1602 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001603
1604 It enables support for "included" .isolated files. They are processed in
1605 strict order but fetched asynchronously from the cache. This is important so
1606 that a file in an included .isolated file that is overridden by an embedding
1607 .isolated file is not fetched needlessly. The includes are fetched in one
1608 pass and the files are fetched as soon as all the ones on the left-side
1609 of the tree were fetched.
1610
1611 The prioritization is very important here for nested .isolated files.
1612 'includes' have the highest priority and the algorithm is optimized for both
1613 deep and wide trees. A deep one is a long link of .isolated files referenced
1614 one at a time by one item in 'includes'. A wide one has a large number of
1615 'includes' in a single .isolated file. 'left' is defined as an included
1616 .isolated file earlier in the 'includes' list. So the order of the elements
1617 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001618
1619 As a side effect this method starts asynchronous fetch of all data files
1620 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1621 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001622 """
1623 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1624
1625 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1626 pending = {}
1627 # Set of hashes of already retrieved items to refuse recursive includes.
1628 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001629 # Set of IsolatedFile's whose data files have already being fetched.
1630 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001631
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001632 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001633 h = isolated_file.obj_hash
1634 if h in seen:
1635 raise isolated_format.IsolatedError(
1636 'IsolatedFile %s is retrieved recursively' % h)
1637 assert h not in pending
1638 seen.add(h)
1639 pending[h] = isolated_file
1640 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1641
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001642 # Start fetching root *.isolated file (single file, not the whole bundle).
1643 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001644
1645 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001646 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001647 item_hash = fetch_queue.wait(pending)
1648 item = pending.pop(item_hash)
1649 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001650
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001651 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001652 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001655 # Always fetch *.isolated files in traversal order, waiting if necessary
1656 # until next to-be-processed node loads. "Waiting" is done by yielding
1657 # back to the outer loop, that waits until some *.isolated is loaded.
1658 for node in isolated_format.walk_includes(self.root):
1659 if node not in processed:
1660 # Not visited, and not yet loaded -> wait for it to load.
1661 if not node.is_loaded:
1662 break
1663 # Not visited and loaded -> process it and continue the traversal.
1664 self._start_fetching_files(node, fetch_queue)
1665 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001666
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001667 # All *.isolated files should be processed by now and only them.
1668 all_isolateds = set(isolated_format.walk_includes(self.root))
1669 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001670
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001671 # Extract 'command' and other bundle properties.
1672 for node in isolated_format.walk_includes(self.root):
1673 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001674 self.relative_cwd = self.relative_cwd or ''
1675
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001676 def _start_fetching_files(self, isolated, fetch_queue):
1677 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001678
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001679 Modifies self.files.
1680 """
1681 logging.debug('fetch_files(%s)', isolated.obj_hash)
1682 for filepath, properties in isolated.data.get('files', {}).iteritems():
1683 # Root isolated has priority on the files being mapped. In particular,
1684 # overridden files must not be fetched.
1685 if filepath not in self.files:
1686 self.files[filepath] = properties
1687 if 'h' in properties:
1688 # Preemptively request files.
1689 logging.debug('fetching %s', filepath)
1690 fetch_queue.add(
1691 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1692
1693 def _update_self(self, node):
1694 """Extracts bundle global parameters from loaded *.isolated file.
1695
1696 Will be called with each loaded *.isolated file in order of traversal of
1697 isolated include graph (see isolated_format.walk_includes).
1698 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001699 # Grabs properties.
1700 if not self.command and node.data.get('command'):
1701 # Ensure paths are correctly separated on windows.
1702 self.command = node.data['command']
1703 if self.command:
1704 self.command[0] = self.command[0].replace('/', os.path.sep)
1705 self.command = tools.fix_python_path(self.command)
1706 if self.read_only is None and node.data.get('read_only') is not None:
1707 self.read_only = node.data['read_only']
1708 if (self.relative_cwd is None and
1709 node.data.get('relative_cwd') is not None):
1710 self.relative_cwd = node.data['relative_cwd']
1711
1712
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001713def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001714 """Returns an object that implements low-level StorageApi interface.
1715
1716 It is used by Storage to work with single isolate |namespace|. It should
1717 rarely be used directly by clients, see 'get_storage' for
1718 a better alternative.
1719
1720 Arguments:
1721 file_or_url: a file path to use file system based storage, or URL of isolate
1722 service to use shared cloud based storage.
1723 namespace: isolate namespace to operate in, also defines hashing and
1724 compression scheme used, i.e. namespace names that end with '-gzip'
1725 store compressed data.
1726
1727 Returns:
1728 Instance of StorageApi subclass.
1729 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001730 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001731 return IsolateServer(file_or_url, namespace)
1732 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001733 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001734
1735
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001736def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001737 """Returns Storage class that can upload and download from |namespace|.
1738
1739 Arguments:
1740 file_or_url: a file path to use file system based storage, or URL of isolate
1741 service to use shared cloud based storage.
1742 namespace: isolate namespace to operate in, also defines hashing and
1743 compression scheme used, i.e. namespace names that end with '-gzip'
1744 store compressed data.
1745
1746 Returns:
1747 Instance of Storage.
1748 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001749 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001750
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001751
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001752def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001753 """Uploads the given tree to the given url.
1754
1755 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001756 base_url: The url of the isolate server to upload to.
1757 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001758 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001759 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001760 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001761 # Filter out symlinks, since they are not represented by items on isolate
1762 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001763 items = []
1764 seen = set()
1765 skipped = 0
1766 for filepath, metadata in infiles:
1767 if 'l' not in metadata and filepath not in seen:
1768 seen.add(filepath)
1769 item = FileItem(
1770 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001771 digest=metadata['h'],
1772 size=metadata['s'],
1773 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001774 items.append(item)
1775 else:
1776 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001777
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001778 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001779 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001780 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001781
1782
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001783def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001784 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001785
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001786 Arguments:
1787 isolated_hash: hash of the root *.isolated file.
1788 storage: Storage class that communicates with isolate storage.
1789 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001790 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001791 require_command: Ensure *.isolated specifies a command to run.
1792
1793 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001794 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001795 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001796 logging.debug(
1797 'fetch_isolated(%s, %s, %s, %s, %s)',
1798 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001799 # Hash algorithm to use, defined by namespace |storage| is using.
1800 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001801 with cache:
1802 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001803 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001804
1805 with tools.Profiler('GetIsolateds'):
1806 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001807 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001808 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1809 try:
1810 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1811 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001812 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001813 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1814 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001815
1816 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001817 bundle.fetch(fetch_queue, isolated_hash, algo)
1818 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001819 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1820 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001821 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001822
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001823 with tools.Profiler('GetRest'):
1824 # Create file system hierarchy.
1825 if not os.path.isdir(outdir):
1826 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001827 create_directories(outdir, bundle.files)
1828 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001829
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001830 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001831 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001832 if not os.path.isdir(cwd):
1833 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001834
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001835 # Multimap: digest -> list of pairs (path, props).
1836 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001837 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001838 if 'h' in props:
1839 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001840
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001841 # Now block on the remaining files to be downloaded and mapped.
1842 logging.info('Retrieving remaining files (%d of them)...',
1843 fetch_queue.pending_count)
1844 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001845 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001846 while remaining:
1847 detector.ping()
1848
1849 # Wait for any item to finish fetching to cache.
1850 digest = fetch_queue.wait(remaining)
1851
1852 # Link corresponding files to a fetched item in cache.
1853 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001854 cache.hardlink(
1855 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001856
1857 # Report progress.
1858 duration = time.time() - last_update
1859 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1860 msg = '%d files remaining...' % len(remaining)
1861 print msg
1862 logging.info(msg)
1863 last_update = time.time()
1864
1865 # Cache could evict some items we just tried to fetch, it's a fatal error.
1866 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001867 raise isolated_format.MappingError(
1868 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001869 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001870
1871
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001872def directory_to_metadata(root, algo, blacklist):
1873 """Returns the FileItem list and .isolated metadata for a directory."""
1874 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001875 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001876 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001877 metadata = {
1878 relpath: isolated_format.file_to_metadata(
1879 os.path.join(root, relpath), {}, False, algo)
1880 for relpath in paths
1881 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001882 for v in metadata.itervalues():
1883 v.pop('t')
1884 items = [
1885 FileItem(
1886 path=os.path.join(root, relpath),
1887 digest=meta['h'],
1888 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001889 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001890 for relpath, meta in metadata.iteritems() if 'h' in meta
1891 ]
1892 return items, metadata
1893
1894
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001895def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001896 """Stores every entries and returns the relevant data.
1897
1898 Arguments:
1899 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001900 files: list of file paths to upload. If a directory is specified, a
1901 .isolated file is created and its hash is returned.
1902 blacklist: function that returns True if a file should be omitted.
1903 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001904 assert all(isinstance(i, unicode) for i in files), files
1905 if len(files) != len(set(map(os.path.abspath, files))):
1906 raise Error('Duplicate entries found.')
1907
1908 results = []
1909 # The temporary directory is only created as needed.
1910 tempdir = None
1911 try:
1912 # TODO(maruel): Yield the files to a worker thread.
1913 items_to_upload = []
1914 for f in files:
1915 try:
1916 filepath = os.path.abspath(f)
1917 if os.path.isdir(filepath):
1918 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001919 items, metadata = directory_to_metadata(
1920 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001921
1922 # Create the .isolated file.
1923 if not tempdir:
1924 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1925 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1926 os.close(handle)
1927 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001928 'algo':
1929 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001930 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001931 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001932 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001933 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001934 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001935 items_to_upload.extend(items)
1936 items_to_upload.append(
1937 FileItem(
1938 path=isolated,
1939 digest=h,
1940 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001941 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001942 results.append((h, f))
1943
1944 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001945 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001946 items_to_upload.append(
1947 FileItem(
1948 path=filepath,
1949 digest=h,
1950 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001951 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001952 results.append((h, f))
1953 else:
1954 raise Error('%s is neither a file or directory.' % f)
1955 except OSError:
1956 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001957 # Technically we would care about which files were uploaded but we don't
1958 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001959 _uploaded_files = storage.upload_items(items_to_upload)
1960 return results
1961 finally:
1962 if tempdir:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001963 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001964
1965
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001966def archive(out, namespace, files, blacklist):
1967 if files == ['-']:
1968 files = sys.stdin.readlines()
1969
1970 if not files:
1971 raise Error('Nothing to upload')
1972
1973 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001974 blacklist = tools.gen_blacklist(blacklist)
1975 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001976 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001977 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1978
1979
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001980@subcommand.usage('<file1..fileN> or - to read from stdin')
1981def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001982 """Archives data to the server.
1983
1984 If a directory is specified, a .isolated file is created the whole directory
1985 is uploaded. Then this .isolated file can be included in another one to run
1986 commands.
1987
1988 The commands output each file that was processed with its content hash. For
1989 directories, the .isolated generated for the directory is listed as the
1990 directory entry itself.
1991 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001992 add_isolate_server_options(parser, False)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001993 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001994 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001995 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07001996 if file_path.is_url(options.isolate_server):
1997 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001998 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001999 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002000 except Error as e:
2001 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002002 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002003
2004
2005def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002006 """Download data from the server.
2007
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002008 It can either download individual files or a complete tree from a .isolated
2009 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002010 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002011 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002012 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002013 '-i', '--isolated', metavar='HASH',
2014 help='hash of an isolated file, .isolated file content is discarded, use '
2015 '--file if you need it')
2016 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002017 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2018 help='hash and destination of a file, can be used multiple times')
2019 parser.add_option(
2020 '-t', '--target', metavar='DIR', default=os.getcwd(),
2021 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002022 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002023 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002024 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002025 if args:
2026 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002027 if bool(options.isolated) == bool(options.file):
2028 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002029
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002030 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002031 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002032
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002033 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07002034 if file_path.is_url(remote):
2035 auth.ensure_logged_in(remote)
2036
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002037 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002038 # Fetching individual files.
2039 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002040 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002041 channel = threading_utils.TaskChannel()
2042 pending = {}
2043 for digest, dest in options.file:
2044 pending[digest] = dest
2045 storage.async_fetch(
2046 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002047 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002048 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002049 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002050 functools.partial(file_write, os.path.join(options.target, dest)))
2051 while pending:
2052 fetched = channel.pull()
2053 dest = pending.pop(fetched)
2054 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002055
Vadim Shtayura3172be52013-12-03 12:49:05 -08002056 # Fetching whole isolated tree.
2057 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002058 with cache:
2059 bundle = fetch_isolated(
2060 isolated_hash=options.isolated,
2061 storage=storage,
2062 cache=cache,
2063 outdir=options.target,
2064 require_command=False)
2065 if bundle.command:
2066 rel = os.path.join(options.target, bundle.relative_cwd)
2067 print('To run this test please run from the directory %s:' %
2068 os.path.join(options.target, rel))
2069 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002070
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002071 return 0
2072
2073
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002074def add_archive_options(parser):
2075 parser.add_option(
2076 '--blacklist',
2077 action='append', default=list(DEFAULT_BLACKLIST),
2078 help='List of regexp to use as blacklist filter when uploading '
2079 'directories')
2080
2081
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002082def add_isolate_server_options(parser, add_indir):
2083 """Adds --isolate-server and --namespace options to parser.
2084
2085 Includes --indir if desired.
2086 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002087 parser.add_option(
2088 '-I', '--isolate-server',
2089 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002090 help='URL of the Isolate Server to use. Defaults to the environment '
2091 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2092 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002093 parser.add_option(
2094 '--namespace', default='default-gzip',
2095 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002096 if add_indir:
2097 parser.add_option(
2098 '--indir', metavar='DIR',
2099 help='Directory used to store the hashtable instead of using an '
2100 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002101
2102
2103def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002104 """Processes the --isolate-server and --indir options and aborts if neither is
2105 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002106 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002107 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002108 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002109 if not has_indir:
2110 parser.error('--isolate-server is required.')
2111 elif not options.indir:
2112 parser.error('Use one of --indir or --isolate-server.')
2113 else:
2114 if has_indir and options.indir:
2115 parser.error('Use only one of --indir or --isolate-server.')
2116
2117 if options.isolate_server:
2118 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002119 if parts.query:
2120 parser.error('--isolate-server doesn\'t support query parameter.')
2121 if parts.fragment:
2122 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002123 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2124 # what is desired here.
2125 new = list(parts)
2126 if not new[1] and new[2]:
2127 new[1] = new[2].rstrip('/')
2128 new[2] = ''
2129 new[2] = new[2].rstrip('/')
2130 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002131 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002132 return
2133
2134 if file_path.is_url(options.indir):
2135 parser.error('Can\'t use an URL for --indir.')
2136 options.indir = unicode(options.indir).replace('/', os.path.sep)
2137 options.indir = os.path.abspath(
2138 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2139 if not os.path.isdir(options.indir):
2140 parser.error('Path given to --indir must exist.')
2141
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002142
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002143def add_cache_options(parser):
2144 cache_group = optparse.OptionGroup(parser, 'Cache management')
2145 cache_group.add_option(
2146 '--cache', metavar='DIR',
2147 help='Directory to keep a local cache of the files. Accelerates download '
2148 'by reusing already downloaded files. Default=%default')
2149 cache_group.add_option(
2150 '--max-cache-size',
2151 type='int',
2152 metavar='NNN',
2153 default=20*1024*1024*1024,
2154 help='Trim if the cache gets larger than this value, default=%default')
2155 cache_group.add_option(
2156 '--min-free-space',
2157 type='int',
2158 metavar='NNN',
2159 default=2*1024*1024*1024,
2160 help='Trim if disk free space becomes lower than this value, '
2161 'default=%default')
2162 cache_group.add_option(
2163 '--max-items',
2164 type='int',
2165 metavar='NNN',
2166 default=100000,
2167 help='Trim if more than this number of items are in the cache '
2168 'default=%default')
2169 parser.add_option_group(cache_group)
2170
2171
2172def process_cache_options(options):
2173 if options.cache:
2174 policies = CachePolicies(
2175 options.max_cache_size, options.min_free_space, options.max_items)
2176
2177 # |options.cache| path may not exist until DiskCache() instance is created.
2178 return DiskCache(
2179 os.path.abspath(options.cache),
2180 policies,
2181 isolated_format.get_hash_algo(options.namespace))
2182 else:
2183 return MemoryCache()
2184
2185
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002186class OptionParserIsolateServer(tools.OptionParserWithLogging):
2187 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002188 tools.OptionParserWithLogging.__init__(
2189 self,
2190 version=__version__,
2191 prog=os.path.basename(sys.modules[__name__].__file__),
2192 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002193 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002194
2195 def parse_args(self, *args, **kwargs):
2196 options, args = tools.OptionParserWithLogging.parse_args(
2197 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002198 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002199 return options, args
2200
2201
2202def main(args):
2203 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002204 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002205
2206
2207if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002208 fix_encoding.fix_encoding()
2209 tools.disable_buffering()
2210 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002211 sys.exit(main(sys.argv[1:]))