blob: 525c8faedebf1c621bd10054cacb43b9c1c9493e [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05008__version__ = '0.4.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040012import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040015import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050020import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000021import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050022import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040030from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040032from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000033from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000034from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000035
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080036import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040037import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080038
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040# Version of isolate protocol passed to the server in /handshake request.
41ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043
Vadim Shtayura3148e072014-09-02 18:51:52 -070044# The file size to be used when we don't know the correct file size,
45# generally used for .isolated files.
46UNKNOWN_FILE_SIZE = None
47
48
49# Maximum expected delay (in seconds) between successive file fetches or uploads
50# in Storage. If it takes longer than that, a deadlock might be happening
51# and all stack frames for all threads are dumped to log.
52DEADLOCK_TIMEOUT = 5 * 60
53
54
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000055# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000056# All files are sorted by likelihood of a change in the file content
57# (currently file size is used to estimate this: larger the file -> larger the
58# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000059# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000060# and so on. Numbers here is a trade-off; the more per request, the lower the
61# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
62# larger values cause longer lookups, increasing the initial latency to start
63# uploading, which is especially an issue for large files. This value is
64# optimized for the "few thousands files to look up with minimal number of large
65# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040066ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000067
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000068
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000069# A list of already compressed extension types that should not receive any
70# compression before being uploaded.
71ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040072 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
73 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074]
75
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000076
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000077# Chunk size to use when reading from network stream.
78NET_IO_FILE_CHUNK = 16 * 1024
79
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000080
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081# Read timeout in seconds for downloads from isolate storage. If there's no
82# response from the server within this timeout whole download will be aborted.
83DOWNLOAD_READ_TIMEOUT = 60
84
85
maruel@chromium.org41601642013-09-18 19:40:46 +000086# The delay (in seconds) to wait between logging statements when retrieving
87# the required files. This is intended to let the user (or buildbot) know that
88# the program is still running.
89DELAY_BETWEEN_UPDATES_IN_SECS = 30
90
91
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050092DEFAULT_BLACKLIST = (
93 # Temporary vim or python files.
94 r'^.+\.(?:pyc|swp)$',
95 # .git or .svn directory.
96 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
97)
98
99
Vadim Shtayura8623c272014-12-01 11:45:27 -0800100# A class to use to communicate with the server by default. Can be changed by
101# 'set_storage_api_class'. Default is IsolateServer.
102_storage_api_cls = None
103
104
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500105class Error(Exception):
106 """Generic runtime error."""
107 pass
108
109
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400110class Aborted(Error):
111 """Operation aborted."""
112 pass
113
114
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000115def stream_read(stream, chunk_size):
116 """Reads chunks from |stream| and yields them."""
117 while True:
118 data = stream.read(chunk_size)
119 if not data:
120 break
121 yield data
122
123
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400124def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800125 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800127 if offset:
128 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000130 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 if not data:
132 break
133 yield data
134
135
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136def file_write(filepath, content_generator):
137 """Writes file content as generated by content_generator.
138
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 Creates the intermediary directory as needed.
140
141 Returns the number of bytes written.
142
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 Meant to be mocked out in unit tests.
144 """
145 filedir = os.path.dirname(filepath)
146 if not os.path.isdir(filedir):
147 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000148 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000149 with open(filepath, 'wb') as f:
150 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000152 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000154
155
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000156def zip_compress(content_generator, level=7):
157 """Reads chunks from |content_generator| and yields zip compressed chunks."""
158 compressor = zlib.compressobj(level)
159 for chunk in content_generator:
160 compressed = compressor.compress(chunk)
161 if compressed:
162 yield compressed
163 tail = compressor.flush(zlib.Z_FINISH)
164 if tail:
165 yield tail
166
167
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400168def zip_decompress(
169 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000170 """Reads zipped data from |content_generator| and yields decompressed data.
171
172 Decompresses data in small chunks (no larger than |chunk_size|) so that
173 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
174
175 Raises IOError if data is corrupted or incomplete.
176 """
177 decompressor = zlib.decompressobj()
178 compressed_size = 0
179 try:
180 for chunk in content_generator:
181 compressed_size += len(chunk)
182 data = decompressor.decompress(chunk, chunk_size)
183 if data:
184 yield data
185 while decompressor.unconsumed_tail:
186 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
187 if data:
188 yield data
189 tail = decompressor.flush()
190 if tail:
191 yield tail
192 except zlib.error as e:
193 raise IOError(
194 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
195 # Ensure all data was read and decompressed.
196 if decompressor.unused_data or decompressor.unconsumed_tail:
197 raise IOError('Not all data was decompressed')
198
199
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000200def get_zip_compression_level(filename):
201 """Given a filename calculates the ideal zip compression level to use."""
202 file_ext = os.path.splitext(filename)[1].lower()
203 # TODO(csharp): Profile to find what compression level works best.
204 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
205
206
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000207def create_directories(base_directory, files):
208 """Creates the directory structure needed by the given list of files."""
209 logging.debug('create_directories(%s, %d)', base_directory, len(files))
210 # Creates the tree of directories to create.
211 directories = set(os.path.dirname(f) for f in files)
212 for item in list(directories):
213 while item:
214 directories.add(item)
215 item = os.path.dirname(item)
216 for d in sorted(directories):
217 if d:
218 os.mkdir(os.path.join(base_directory, d))
219
220
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221def create_symlinks(base_directory, files):
222 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000223 for filepath, properties in files:
224 if 'l' not in properties:
225 continue
226 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500227 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000228 logging.warning('Ignoring symlink %s', filepath)
229 continue
230 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500231 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000232 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233
234
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000235def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 """Determines if the given files appears valid.
237
238 Currently it just checks the file's size.
239 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700240 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000241 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000242 actual_size = os.stat(filepath).st_size
243 if size != actual_size:
244 logging.warning(
245 'Found invalid item %s; %d != %d',
246 os.path.basename(filepath), actual_size, size)
247 return False
248 return True
249
250
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251class Item(object):
252 """An item to push to Storage.
253
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800254 Its digest and size may be provided in advance, if known. Otherwise they will
255 be derived from content(). If digest is provided, it MUST correspond to
256 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000257
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800258 When used with Storage, Item starts its life in a main thread, travels
259 to 'contains' thread, then to 'push' thread and then finally back to
260 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """
262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000264 self.digest = digest
265 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800269 def content(self):
270 """Iterable with content of this item as byte string (str) chunks."""
271 raise NotImplementedError()
272
273 def prepare(self, hash_algo):
274 """Ensures self.digest and self.size are set.
275
276 Uses content() as a source of data to calculate them. Does nothing if digest
277 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000278
279 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800280 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000281 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800282 if self.digest is None or self.size is None:
283 digest = hash_algo()
284 total = 0
285 for chunk in self.content():
286 digest.update(chunk)
287 total += len(chunk)
288 self.digest = digest.hexdigest()
289 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000290
291
292class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800293 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000294
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800295 Its digest and size may be provided in advance, if known. Otherwise they will
296 be derived from the file content.
297 """
298
299 def __init__(self, path, digest=None, size=None, high_priority=False):
300 super(FileItem, self).__init__(
301 digest,
302 size if size is not None else os.stat(path).st_size,
303 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000304 self.path = path
305 self.compression_level = get_zip_compression_level(path)
306
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800307 def content(self):
308 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000309
310
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000311class BufferItem(Item):
312 """A byte buffer to push to Storage."""
313
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800314 def __init__(self, buf, high_priority=False):
315 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000316 self.buffer = buf
317
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800318 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319 return [self.buffer]
320
321
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000322class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800323 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000324
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800325 Implements compression support, parallel 'contains' checks, parallel uploads
326 and more.
327
328 Works only within single namespace (and thus hashing algorithm and compression
329 scheme are fixed).
330
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400331 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
332 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800333 """
334
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700335 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400337 self._use_zip = isolated_format.is_namespace_with_compression(
338 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400339 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000340 self._cpu_thread_pool = None
341 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400342 self._aborted = False
343 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000344
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000345 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700346 def hash_algo(self):
347 """Hashing algorithm used to name files in storage based on their content.
348
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400349 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700350 """
351 return self._hash_algo
352
353 @property
354 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500355 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700356 return self._storage_api.location
357
358 @property
359 def namespace(self):
360 """Isolate namespace used by this storage.
361
362 Indirectly defines hashing scheme and compression method used.
363 """
364 return self._storage_api.namespace
365
366 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 def cpu_thread_pool(self):
368 """ThreadPool for CPU-bound tasks like zipping."""
369 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500370 threads = max(threading_utils.num_processors(), 2)
371 if sys.maxsize <= 2L**32:
372 # On 32 bits userland, do not try to use more than 16 threads.
373 threads = min(threads, 16)
374 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000375 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000376
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377 @property
378 def net_thread_pool(self):
379 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
380 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700381 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000382 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000383
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 def close(self):
385 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400386 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 if self._cpu_thread_pool:
388 self._cpu_thread_pool.join()
389 self._cpu_thread_pool.close()
390 self._cpu_thread_pool = None
391 if self._net_thread_pool:
392 self._net_thread_pool.join()
393 self._net_thread_pool.close()
394 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400395 logging.info('Done.')
396
397 def abort(self):
398 """Cancels any pending or future operations."""
399 # This is not strictly theadsafe, but in the worst case the logging message
400 # will be printed twice. Not a big deal. In other places it is assumed that
401 # unprotected reads and writes to _aborted are serializable (it is true
402 # for python) and thus no locking is used.
403 if not self._aborted:
404 logging.warning('Aborting... It can take a while.')
405 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000406
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000407 def __enter__(self):
408 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400409 assert not self._prev_sig_handlers, self._prev_sig_handlers
410 for s in (signal.SIGINT, signal.SIGTERM):
411 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 return self
413
414 def __exit__(self, _exc_type, _exc_value, _traceback):
415 """Context manager interface."""
416 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 while self._prev_sig_handlers:
418 s, h = self._prev_sig_handlers.popitem()
419 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 return False
421
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000422 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800423 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000424
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800425 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000426
427 Arguments:
428 items: list of Item instances that represents data to upload.
429
430 Returns:
431 List of items that were uploaded. All other items are already there.
432 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700433 logging.info('upload_items(items=%d)', len(items))
434
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800435 # Ensure all digests are calculated.
436 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700437 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800438
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000439 # For each digest keep only first Item that matches it. All other items
440 # are just indistinguishable copies from the point of view of isolate
441 # server (it doesn't care about paths at all, only content and digests).
442 seen = {}
443 duplicates = 0
444 for item in items:
445 if seen.setdefault(item.digest, item) is not item:
446 duplicates += 1
447 items = seen.values()
448 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700449 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000450
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000452 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000453 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800454 channel = threading_utils.TaskChannel()
455 for missing_item, push_state in self.get_missing_items(items):
456 missing.add(missing_item)
457 self.async_push(channel, missing_item, push_state)
458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 # No need to spawn deadlock detector thread if there's nothing to upload.
460 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700461 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000463 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 detector.ping()
465 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000466 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000467 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000469 logging.info('All files are uploaded')
470
471 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000472 total = len(items)
473 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 logging.info(
475 'Total: %6d, %9.1fkb',
476 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000477 total_size / 1024.)
478 cache_hit = set(items) - missing
479 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000480 logging.info(
481 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
482 len(cache_hit),
483 cache_hit_size / 1024.,
484 len(cache_hit) * 100. / total,
485 cache_hit_size * 100. / total_size if total_size else 0)
486 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000487 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000488 logging.info(
489 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
490 len(cache_miss),
491 cache_miss_size / 1024.,
492 len(cache_miss) * 100. / total,
493 cache_miss_size * 100. / total_size if total_size else 0)
494
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000495 return uploaded
496
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800497 def get_fetch_url(self, item):
498 """Returns an URL that can be used to fetch given item once it's uploaded.
499
500 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800503 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000504
505 Returns:
506 An URL or None if underlying protocol doesn't support this.
507 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700508 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000510
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000512 """Starts asynchronous push to the server in a parallel thread.
513
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 Can be used only after |item| was checked for presence on a server with
515 'get_missing_items' call. 'get_missing_items' returns |push_state| object
516 that contains storage specific information describing how to upload
517 the item (for example in case of cloud storage, it is signed upload URLs).
518
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000519 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000520 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000521 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800522 push_state: push state returned by 'get_missing_items' call for |item|.
523
524 Returns:
525 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000526 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800527 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400528 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700529 threading_utils.PRIORITY_HIGH if item.high_priority
530 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000532 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400533 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400534 if self._aborted:
535 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700536 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800537 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 return item
539
540 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700541 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800542 self.net_thread_pool.add_task_with_channel(
543 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 return
545
546 # If zipping is enabled, zip in a separate thread.
547 def zip_and_push():
548 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
549 # content right here. It will block until all file is zipped.
550 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400551 if self._aborted:
552 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800553 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 data = ''.join(stream)
555 except Exception as exc:
556 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800557 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000558 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000559 self.net_thread_pool.add_task_with_channel(
560 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000561 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000562
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800563 def push(self, item, push_state):
564 """Synchronously pushes a single item to the server.
565
566 If you need to push many items at once, consider using 'upload_items' or
567 'async_push' with instance of TaskChannel.
568
569 Arguments:
570 item: item to upload as instance of Item class.
571 push_state: push state returned by 'get_missing_items' call for |item|.
572
573 Returns:
574 Pushed item (same object as |item|).
575 """
576 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700577 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800578 self.async_push(channel, item, push_state)
579 pushed = channel.pull()
580 assert pushed is item
581 return item
582
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000583 def async_fetch(self, channel, priority, digest, size, sink):
584 """Starts asynchronous fetch from the server in a parallel thread.
585
586 Arguments:
587 channel: TaskChannel that receives back |digest| when download ends.
588 priority: thread pool task priority for the fetch.
589 digest: hex digest of an item to download.
590 size: expected size of the item (after decompression).
591 sink: function that will be called as sink(generator).
592 """
593 def fetch():
594 try:
595 # Prepare reading pipeline.
596 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400598 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000599 # Run |stream| through verifier that will assert its size.
600 verifier = FetchStreamVerifier(stream, size)
601 # Verified stream goes to |sink|.
602 sink(verifier.run())
603 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800604 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000605 raise
606 return digest
607
608 # Don't bother with zip_thread_pool for decompression. Decompression is
609 # really fast and most probably IO bound anyway.
610 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
611
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000612 def get_missing_items(self, items):
613 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000614
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
617 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000618 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000619
620 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800621 For each missing item it yields a pair (item, push_state), where:
622 * item - Item object that is missing (one of |items|).
623 * push_state - opaque object that contains storage specific information
624 describing how to upload the item (for example in case of cloud
625 storage, it is signed upload URLs). It can later be passed to
626 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000627 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000628 channel = threading_utils.TaskChannel()
629 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800630
631 # Ensure all digests are calculated.
632 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700633 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400635 def contains(batch):
636 if self._aborted:
637 raise Aborted()
638 return self._storage_api.contains(batch)
639
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000640 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800641 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400642 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400643 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000644 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000646 # Yield results as they come in.
647 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800648 for missing_item, push_state in channel.pull().iteritems():
649 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000650
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800652def batch_items_for_check(items):
653 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000654
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800655 Each batch corresponds to a single 'exists?' query to the server via a call
656 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000657
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800658 Arguments:
659 items: a list of Item objects.
660
661 Yields:
662 Batches of items to query for existence in a single operation,
663 each batch is a list of Item objects.
664 """
665 batch_count = 0
666 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
667 next_queries = []
668 for item in sorted(items, key=lambda x: x.size, reverse=True):
669 next_queries.append(item)
670 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000671 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800672 next_queries = []
673 batch_count += 1
674 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
675 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
676 if next_queries:
677 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000678
679
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000680class FetchQueue(object):
681 """Fetches items from Storage and places them into LocalCache.
682
683 It manages multiple concurrent fetch operations. Acts as a bridge between
684 Storage and LocalCache so that Storage and LocalCache don't depend on each
685 other at all.
686 """
687
688 def __init__(self, storage, cache):
689 self.storage = storage
690 self.cache = cache
691 self._channel = threading_utils.TaskChannel()
692 self._pending = set()
693 self._accessed = set()
694 self._fetched = cache.cached_set()
695
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400696 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700697 self,
698 digest,
699 size=UNKNOWN_FILE_SIZE,
700 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000701 """Starts asynchronous fetch of item |digest|."""
702 # Fetching it now?
703 if digest in self._pending:
704 return
705
706 # Mark this file as in use, verify_all_cached will later ensure it is still
707 # in cache.
708 self._accessed.add(digest)
709
710 # Already fetched? Notify cache to update item's LRU position.
711 if digest in self._fetched:
712 # 'touch' returns True if item is in cache and not corrupted.
713 if self.cache.touch(digest, size):
714 return
715 # Item is corrupted, remove it from cache and fetch it again.
716 self._fetched.remove(digest)
717 self.cache.evict(digest)
718
719 # TODO(maruel): It should look at the free disk space, the current cache
720 # size and the size of the new item on every new item:
721 # - Trim the cache as more entries are listed when free disk space is low,
722 # otherwise if the amount of data downloaded during the run > free disk
723 # space, it'll crash.
724 # - Make sure there's enough free disk space to fit all dependencies of
725 # this run! If not, abort early.
726
727 # Start fetching.
728 self._pending.add(digest)
729 self.storage.async_fetch(
730 self._channel, priority, digest, size,
731 functools.partial(self.cache.write, digest))
732
733 def wait(self, digests):
734 """Starts a loop that waits for at least one of |digests| to be retrieved.
735
736 Returns the first digest retrieved.
737 """
738 # Flush any already fetched items.
739 for digest in digests:
740 if digest in self._fetched:
741 return digest
742
743 # Ensure all requested items are being fetched now.
744 assert all(digest in self._pending for digest in digests), (
745 digests, self._pending)
746
747 # Wait for some requested item to finish fetching.
748 while self._pending:
749 digest = self._channel.pull()
750 self._pending.remove(digest)
751 self._fetched.add(digest)
752 if digest in digests:
753 return digest
754
755 # Should never reach this point due to assert above.
756 raise RuntimeError('Impossible state')
757
758 def inject_local_file(self, path, algo):
759 """Adds local file to the cache as if it was fetched from storage."""
760 with open(path, 'rb') as f:
761 data = f.read()
762 digest = algo(data).hexdigest()
763 self.cache.write(digest, [data])
764 self._fetched.add(digest)
765 return digest
766
767 @property
768 def pending_count(self):
769 """Returns number of items to be fetched."""
770 return len(self._pending)
771
772 def verify_all_cached(self):
773 """True if all accessed items are in cache."""
774 return self._accessed.issubset(self.cache.cached_set())
775
776
777class FetchStreamVerifier(object):
778 """Verifies that fetched file is valid before passing it to the LocalCache."""
779
780 def __init__(self, stream, expected_size):
781 self.stream = stream
782 self.expected_size = expected_size
783 self.current_size = 0
784
785 def run(self):
786 """Generator that yields same items as |stream|.
787
788 Verifies |stream| is complete before yielding a last chunk to consumer.
789
790 Also wraps IOError produced by consumer into MappingError exceptions since
791 otherwise Storage will retry fetch on unrelated local cache errors.
792 """
793 # Read one chunk ahead, keep it in |stored|.
794 # That way a complete stream can be verified before pushing last chunk
795 # to consumer.
796 stored = None
797 for chunk in self.stream:
798 assert chunk is not None
799 if stored is not None:
800 self._inspect_chunk(stored, is_last=False)
801 try:
802 yield stored
803 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400804 raise isolated_format.MappingError(
805 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000806 stored = chunk
807 if stored is not None:
808 self._inspect_chunk(stored, is_last=True)
809 try:
810 yield stored
811 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400812 raise isolated_format.MappingError(
813 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000814
815 def _inspect_chunk(self, chunk, is_last):
816 """Called for each fetched chunk before passing it to consumer."""
817 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400818 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700819 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000820 (self.expected_size != self.current_size)):
821 raise IOError('Incorrect file size: expected %d, got %d' % (
822 self.expected_size, self.current_size))
823
824
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000825class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800826 """Interface for classes that implement low-level storage operations.
827
828 StorageApi is oblivious of compression and hashing scheme used. This details
829 are handled in higher level Storage class.
830
831 Clients should generally not use StorageApi directly. Storage class is
832 preferred since it implements compression and upload optimizations.
833 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000834
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700835 @property
836 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500837 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700838 raise NotImplementedError()
839
840 @property
841 def namespace(self):
842 """Isolate namespace used by this storage.
843
844 Indirectly defines hashing scheme and compression method used.
845 """
846 raise NotImplementedError()
847
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000848 def get_fetch_url(self, digest):
849 """Returns an URL that can be used to fetch an item with given digest.
850
851 Arguments:
852 digest: hex digest of item to fetch.
853
854 Returns:
855 An URL or None if the protocol doesn't support this.
856 """
857 raise NotImplementedError()
858
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800859 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000860 """Fetches an object and yields its content.
861
862 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000863 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800864 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000865
866 Yields:
867 Chunks of downloaded item (as str objects).
868 """
869 raise NotImplementedError()
870
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800871 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000872 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000873
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800874 |item| MUST go through 'contains' call to get |push_state| before it can
875 be pushed to the storage.
876
877 To be clear, here is one possible usage:
878 all_items = [... all items to push as Item subclasses ...]
879 for missing_item, push_state in storage_api.contains(all_items).items():
880 storage_api.push(missing_item, push_state)
881
882 When pushing to a namespace with compression, data that should be pushed
883 and data provided by the item is not the same. In that case |content| is
884 not None and it yields chunks of compressed data (using item.content() as
885 a source of original uncompressed data). This is implemented by Storage
886 class.
887
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000888 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000889 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800890 push_state: push state object as returned by 'contains' call.
891 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000892
893 Returns:
894 None.
895 """
896 raise NotImplementedError()
897
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000898 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800899 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000900
901 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800902 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000903
904 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800905 A dict missing Item -> opaque push state object to be passed to 'push'.
906 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000907 """
908 raise NotImplementedError()
909
910
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800911class _IsolateServerPushState(object):
912 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500913
914 Note this needs to be a global class to support pickling.
915 """
916
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500917 def __init__(self, upload_url, finalize_url, size):
Mike Frysinger27f03da2014-02-12 16:47:01 -0500918 self.upload_url = upload_url
919 self.finalize_url = finalize_url
920 self.uploaded = False
921 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500922 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500923
924
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000925class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000926 """StorageApi implementation that downloads and uploads to Isolate Server.
927
928 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800929 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000930 """
931
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000932 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000933 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500934 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700935 self._base_url = base_url.rstrip('/')
936 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000937 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000938 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500939 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000940
941 @staticmethod
942 def _generate_handshake_request():
943 """Returns a dict to be sent as handshake request body."""
944 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
945 return {
946 'client_app_version': __version__,
947 'fetcher': True,
948 'protocol_version': ISOLATE_PROTOCOL_VERSION,
949 'pusher': True,
950 }
951
952 @staticmethod
953 def _validate_handshake_response(caps):
954 """Validates and normalizes handshake response."""
955 logging.info('Protocol version: %s', caps['protocol_version'])
956 logging.info('Server version: %s', caps['server_app_version'])
957 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400958 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000959 if not caps['access_token']:
960 raise ValueError('access_token is missing')
961 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000962
963 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000964 def _server_capabilities(self):
965 """Performs handshake with the server if not yet done.
966
967 Returns:
968 Server capabilities dictionary as returned by /handshake endpoint.
969
970 Raises:
971 MappingError if server rejects the handshake.
972 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000973 # TODO(maruel): Make this request much earlier asynchronously while the
974 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800975
976 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
977 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000978 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000979 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000980 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400981 caps = net.url_read_json(
982 url=self._base_url + '/content-gs/handshake',
983 data=self._generate_handshake_request())
984 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400985 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000986 if not isinstance(caps, dict):
987 raise ValueError('Expecting JSON dict')
988 self._server_caps = self._validate_handshake_response(caps)
989 except (ValueError, KeyError, TypeError) as exc:
990 # KeyError exception has very confusing str conversion: it's just a
991 # missing key value and nothing else. So print exception class name
992 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400993 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400994 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000995 exc.__class__.__name__, exc))
996 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000997
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700998 @property
999 def location(self):
1000 return self._base_url
1001
1002 @property
1003 def namespace(self):
1004 return self._namespace
1005
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001006 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001007 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001008 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001009 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001010
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001011 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001012 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001013 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001014
Vadim Shtayura8623c272014-12-01 11:45:27 -08001015 connection = self.do_fetch(source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001016 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001017 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001018
1019 # If |offset| is used, verify server respects it by checking Content-Range.
1020 if offset:
1021 content_range = connection.get_header('Content-Range')
1022 if not content_range:
1023 raise IOError('Missing Content-Range header')
1024
1025 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1026 # According to a spec, <size> can be '*' meaning "Total size of the file
1027 # is not known in advance".
1028 try:
1029 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1030 if not match:
1031 raise ValueError()
1032 content_offset = int(match.group(1))
1033 last_byte_index = int(match.group(2))
1034 size = None if match.group(3) == '*' else int(match.group(3))
1035 except ValueError:
1036 raise IOError('Invalid Content-Range header: %s' % content_range)
1037
1038 # Ensure returned offset equals requested one.
1039 if offset != content_offset:
1040 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1041 offset, content_offset, content_range))
1042
1043 # Ensure entire tail of the file is returned.
1044 if size is not None and last_byte_index + 1 != size:
1045 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1046
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001047 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001048
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001049 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001050 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001051 assert item.digest is not None
1052 assert item.size is not None
1053 assert isinstance(push_state, _IsolateServerPushState)
1054 assert not push_state.finalized
1055
1056 # Default to item.content().
1057 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001058 logging.info('Push state size: %d', push_state.size)
1059 if isinstance(content, (basestring, list)):
1060 # Memory is already used, too late.
1061 with self._lock:
1062 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001063 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001064 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1065 # If |content| is indeed a generator, it can not be re-winded back to the
1066 # beginning of the stream. A retry will find it exhausted. A possible
1067 # solution is to wrap |content| generator with some sort of caching
1068 # restartable generator. It should be done alongside streaming support
1069 # implementation.
1070 #
1071 # In theory, we should keep the generator, so that it is not serialized in
1072 # memory. Sadly net.HttpService.request() requires the body to be
1073 # serialized.
1074 assert isinstance(content, types.GeneratorType), repr(content)
1075 slept = False
1076 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001077 # One byte less than 512mb. This is to cope with incompressible content.
1078 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001079 while True:
1080 with self._lock:
1081 # This is due to 32 bits python when uploading very large files. The
1082 # problem is that it's comparing uncompressed sizes, while we care
1083 # about compressed sizes since it's what is serialized in memory.
1084 # The first check assumes large files are compressible and that by
1085 # throttling one upload at once, we can survive. Otherwise, kaboom.
1086 memory_use = self._memory_use
1087 if ((push_state.size >= max_size and not memory_use) or
1088 (memory_use + push_state.size <= max_size)):
1089 self._memory_use += push_state.size
1090 memory_use = self._memory_use
1091 break
1092 time.sleep(0.1)
1093 slept = True
1094 if slept:
1095 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001096
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001097 try:
1098 # This push operation may be a retry after failed finalization call below,
1099 # no need to reupload contents in that case.
1100 if not push_state.uploaded:
1101 # PUT file to |upload_url|.
1102 success = self.do_push(push_state.upload_url, content)
1103 if not success:
1104 raise IOError('Failed to upload a file %s to %s' % (
1105 item.digest, push_state.upload_url))
1106 push_state.uploaded = True
1107 else:
1108 logging.info(
1109 'A file %s already uploaded, retrying finalization only',
1110 item.digest)
1111
1112 # Optionally notify the server that it's done.
1113 if push_state.finalize_url:
1114 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1115 # send it to isolated server. That way isolate server can verify that
1116 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1117 # stored files).
1118 # TODO(maruel): Fix the server to accept properly data={} so
1119 # url_read_json() can be used.
1120 response = net.url_read(
1121 url=push_state.finalize_url,
1122 data='',
1123 content_type='application/json',
1124 method='POST')
1125 if response is None:
1126 raise IOError('Failed to finalize an upload of %s' % item.digest)
1127 push_state.finalized = True
1128 finally:
1129 with self._lock:
1130 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001131
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001132 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001133 # Ensure all items were initialized with 'prepare' call. Storage does that.
1134 assert all(i.digest is not None and i.size is not None for i in items)
1135
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001136 # Request body is a json encoded list of dicts.
1137 body = [
1138 {
1139 'h': item.digest,
1140 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001141 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001142 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001143 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001144
1145 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001146 self._base_url,
1147 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001148 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001149
1150 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001151 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001152 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001153 response = net.url_read_json(url=query_url, data=body)
1154 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001155 raise isolated_format.MappingError(
1156 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001157 if not isinstance(response, list):
1158 raise ValueError('Expecting response with json-encoded list')
1159 if len(response) != len(items):
1160 raise ValueError(
1161 'Incorrect number of items in the list, expected %d, '
1162 'but got %d' % (len(items), len(response)))
1163 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001164 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001165 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001166
1167 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001168 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001169 for i, push_urls in enumerate(response):
1170 if push_urls:
1171 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001172 missing_items[items[i]] = _IsolateServerPushState(
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001173 push_urls[0], push_urls[1], items[i].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001174 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001175 len(items), len(items) - len(missing_items))
1176 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001177
Vadim Shtayura8623c272014-12-01 11:45:27 -08001178 def do_fetch(self, url, offset):
1179 """Fetches isolated data from the URL.
1180
1181 Used only for fetching files, not for API calls. Can be overridden in
1182 subclasses.
1183
1184 Args:
1185 url: URL to fetch the data from, can possibly return http redirect.
1186 offset: byte offset inside the file to start fetching from.
1187
1188 Returns:
1189 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1190 """
1191 return net.url_open(
1192 url,
1193 read_timeout=DOWNLOAD_READ_TIMEOUT,
1194 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1195
1196 def do_push(self, url, content):
1197 """Uploads isolated file to the URL.
1198
1199 Used only for storing files, not for API calls. Can be overridden in
1200 subclasses.
1201
1202 Args:
1203 url: URL to upload the data to.
1204 content: an iterable that yields 'str' chunks.
1205
1206 Returns:
1207 True on success, False on failure.
1208 """
1209 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1210 # upload support is implemented.
1211 if isinstance(content, list) and len(content) == 1:
1212 content = content[0]
1213 else:
1214 content = ''.join(content)
1215 response = net.url_read(
1216 url=url,
1217 data=content,
1218 content_type='application/octet-stream',
1219 method='PUT')
1220 return response is not None
1221
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001222
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001223class LocalCache(object):
1224 """Local cache that stores objects fetched via Storage.
1225
1226 It can be accessed concurrently from multiple threads, so it should protect
1227 its internal state with some lock.
1228 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001229 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001230
1231 def __enter__(self):
1232 """Context manager interface."""
1233 return self
1234
1235 def __exit__(self, _exc_type, _exec_value, _traceback):
1236 """Context manager interface."""
1237 return False
1238
1239 def cached_set(self):
1240 """Returns a set of all cached digests (always a new object)."""
1241 raise NotImplementedError()
1242
1243 def touch(self, digest, size):
1244 """Ensures item is not corrupted and updates its LRU position.
1245
1246 Arguments:
1247 digest: hash digest of item to check.
1248 size: expected size of this item.
1249
1250 Returns:
1251 True if item is in cache and not corrupted.
1252 """
1253 raise NotImplementedError()
1254
1255 def evict(self, digest):
1256 """Removes item from cache if it's there."""
1257 raise NotImplementedError()
1258
1259 def read(self, digest):
1260 """Returns contents of the cached item as a single str."""
1261 raise NotImplementedError()
1262
1263 def write(self, digest, content):
1264 """Reads data from |content| generator and stores it in cache."""
1265 raise NotImplementedError()
1266
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001267 def hardlink(self, digest, dest, file_mode):
1268 """Ensures file at |dest| has same content as cached |digest|.
1269
1270 If file_mode is provided, it is used to set the executable bit if
1271 applicable.
1272 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001273 raise NotImplementedError()
1274
1275
1276class MemoryCache(LocalCache):
1277 """LocalCache implementation that stores everything in memory."""
1278
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001279 def __init__(self, file_mode_mask=0500):
1280 """Args:
1281 file_mode_mask: bit mask to AND file mode with. Default value will make
1282 all mapped files to be read only.
1283 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001284 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001285 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001286 # Let's not assume dict is thread safe.
1287 self._lock = threading.Lock()
1288 self._contents = {}
1289
1290 def cached_set(self):
1291 with self._lock:
1292 return set(self._contents)
1293
1294 def touch(self, digest, size):
1295 with self._lock:
1296 return digest in self._contents
1297
1298 def evict(self, digest):
1299 with self._lock:
1300 self._contents.pop(digest, None)
1301
1302 def read(self, digest):
1303 with self._lock:
1304 return self._contents[digest]
1305
1306 def write(self, digest, content):
1307 # Assemble whole stream before taking the lock.
1308 data = ''.join(content)
1309 with self._lock:
1310 self._contents[digest] = data
1311
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001312 def hardlink(self, digest, dest, file_mode):
1313 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001314 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001315 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001316 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001317
1318
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001319class CachePolicies(object):
1320 def __init__(self, max_cache_size, min_free_space, max_items):
1321 """
1322 Arguments:
1323 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1324 cache is effectively a leak.
1325 - min_free_space: Trim if disk free space becomes lower than this value. If
1326 0, it unconditionally fill the disk.
1327 - max_items: Maximum number of items to keep in the cache. If 0, do not
1328 enforce a limit.
1329 """
1330 self.max_cache_size = max_cache_size
1331 self.min_free_space = min_free_space
1332 self.max_items = max_items
1333
1334
1335class DiskCache(LocalCache):
1336 """Stateful LRU cache in a flat hash table in a directory.
1337
1338 Saves its state as json file.
1339 """
1340 STATE_FILE = 'state.json'
1341
1342 def __init__(self, cache_dir, policies, hash_algo):
1343 """
1344 Arguments:
1345 cache_dir: directory where to place the cache.
1346 policies: cache retention policies.
1347 algo: hashing algorithm used.
1348 """
1349 super(DiskCache, self).__init__()
1350 self.cache_dir = cache_dir
1351 self.policies = policies
1352 self.hash_algo = hash_algo
1353 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1354
1355 # All protected methods (starting with '_') except _path should be called
1356 # with this lock locked.
1357 self._lock = threading_utils.LockWithAssert()
1358 self._lru = lru.LRUDict()
1359
1360 # Profiling values.
1361 self._added = []
1362 self._removed = []
1363 self._free_disk = 0
1364
1365 with tools.Profiler('Setup'):
1366 with self._lock:
1367 self._load()
1368
1369 def __enter__(self):
1370 return self
1371
1372 def __exit__(self, _exc_type, _exec_value, _traceback):
1373 with tools.Profiler('CleanupTrimming'):
1374 with self._lock:
1375 self._trim()
1376
1377 logging.info(
1378 '%5d (%8dkb) added',
1379 len(self._added), sum(self._added) / 1024)
1380 logging.info(
1381 '%5d (%8dkb) current',
1382 len(self._lru),
1383 sum(self._lru.itervalues()) / 1024)
1384 logging.info(
1385 '%5d (%8dkb) removed',
1386 len(self._removed), sum(self._removed) / 1024)
1387 logging.info(
1388 ' %8dkb free',
1389 self._free_disk / 1024)
1390 return False
1391
1392 def cached_set(self):
1393 with self._lock:
1394 return self._lru.keys_set()
1395
1396 def touch(self, digest, size):
1397 """Verifies an actual file is valid.
1398
1399 Note that is doesn't compute the hash so it could still be corrupted if the
1400 file size didn't change.
1401
1402 TODO(maruel): More stringent verification while keeping the check fast.
1403 """
1404 # Do the check outside the lock.
1405 if not is_valid_file(self._path(digest), size):
1406 return False
1407
1408 # Update it's LRU position.
1409 with self._lock:
1410 if digest not in self._lru:
1411 return False
1412 self._lru.touch(digest)
1413 return True
1414
1415 def evict(self, digest):
1416 with self._lock:
1417 self._lru.pop(digest)
1418 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1419
1420 def read(self, digest):
1421 with open(self._path(digest), 'rb') as f:
1422 return f.read()
1423
1424 def write(self, digest, content):
1425 path = self._path(digest)
1426 # A stale broken file may remain. It is possible for the file to have write
1427 # access bit removed which would cause the file_write() call to fail to open
1428 # in write mode. Take no chance here.
1429 file_path.try_remove(path)
1430 try:
1431 size = file_write(path, content)
1432 except:
1433 # There are two possible places were an exception can occur:
1434 # 1) Inside |content| generator in case of network or unzipping errors.
1435 # 2) Inside file_write itself in case of disk IO errors.
1436 # In any case delete an incomplete file and propagate the exception to
1437 # caller, it will be logged there.
1438 file_path.try_remove(path)
1439 raise
1440 # Make the file read-only in the cache. This has a few side-effects since
1441 # the file node is modified, so every directory entries to this file becomes
1442 # read-only. It's fine here because it is a new file.
1443 file_path.set_read_only(path, True)
1444 with self._lock:
1445 self._add(digest, size)
1446
1447 def hardlink(self, digest, dest, file_mode):
1448 """Hardlinks the file to |dest|.
1449
1450 Note that the file permission bits are on the file node, not the directory
1451 entry, so changing the access bit on any of the directory entries for the
1452 file node will affect them all.
1453 """
1454 path = self._path(digest)
1455 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1456 file_path.hardlink(path, dest)
1457 if file_mode is not None:
1458 # Ignores all other bits.
1459 os.chmod(dest, file_mode & 0500)
1460
1461 def _load(self):
1462 """Loads state of the cache from json file."""
1463 self._lock.assert_locked()
1464
1465 if not os.path.isdir(self.cache_dir):
1466 os.makedirs(self.cache_dir)
1467 else:
1468 # Make sure the cache is read-only.
1469 # TODO(maruel): Calculate the cost and optimize the performance
1470 # accordingly.
1471 file_path.make_tree_read_only(self.cache_dir)
1472
1473 # Load state of the cache.
1474 if os.path.isfile(self.state_file):
1475 try:
1476 self._lru = lru.LRUDict.load(self.state_file)
1477 except ValueError as err:
1478 logging.error('Failed to load cache state: %s' % (err,))
1479 # Don't want to keep broken state file.
1480 file_path.try_remove(self.state_file)
1481
1482 # Ensure that all files listed in the state still exist and add new ones.
1483 previous = self._lru.keys_set()
1484 unknown = []
1485 for filename in os.listdir(self.cache_dir):
1486 if filename == self.STATE_FILE:
1487 continue
1488 if filename in previous:
1489 previous.remove(filename)
1490 continue
1491 # An untracked file.
1492 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1493 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001494 p = self._path(filename)
1495 if os.path.isdir(p):
1496 try:
1497 file_path.rmtree(p)
1498 except OSError:
1499 pass
1500 else:
1501 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001502 continue
1503 # File that's not referenced in 'state.json'.
1504 # TODO(vadimsh): Verify its SHA1 matches file name.
1505 logging.warning('Adding unknown file %s to cache', filename)
1506 unknown.append(filename)
1507
1508 if unknown:
1509 # Add as oldest files. They will be deleted eventually if not accessed.
1510 self._add_oldest_list(unknown)
1511 logging.warning('Added back %d unknown files', len(unknown))
1512
1513 if previous:
1514 # Filter out entries that were not found.
1515 logging.warning('Removed %d lost files', len(previous))
1516 for filename in previous:
1517 self._lru.pop(filename)
1518 self._trim()
1519
1520 def _save(self):
1521 """Saves the LRU ordering."""
1522 self._lock.assert_locked()
1523 if sys.platform != 'win32':
1524 d = os.path.dirname(self.state_file)
1525 if os.path.isdir(d):
1526 # Necessary otherwise the file can't be created.
1527 file_path.set_read_only(d, False)
1528 if os.path.isfile(self.state_file):
1529 file_path.set_read_only(self.state_file, False)
1530 self._lru.save(self.state_file)
1531
1532 def _trim(self):
1533 """Trims anything we don't know, make sure enough free space exists."""
1534 self._lock.assert_locked()
1535
1536 # Ensure maximum cache size.
1537 if self.policies.max_cache_size:
1538 total_size = sum(self._lru.itervalues())
1539 while total_size > self.policies.max_cache_size:
1540 total_size -= self._remove_lru_file()
1541
1542 # Ensure maximum number of items in the cache.
1543 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1544 for _ in xrange(len(self._lru) - self.policies.max_items):
1545 self._remove_lru_file()
1546
1547 # Ensure enough free space.
1548 self._free_disk = file_path.get_free_space(self.cache_dir)
1549 trimmed_due_to_space = False
1550 while (
1551 self.policies.min_free_space and
1552 self._lru and
1553 self._free_disk < self.policies.min_free_space):
1554 trimmed_due_to_space = True
1555 self._remove_lru_file()
1556 self._free_disk = file_path.get_free_space(self.cache_dir)
1557 if trimmed_due_to_space:
1558 total_usage = sum(self._lru.itervalues())
1559 usage_percent = 0.
1560 if total_usage:
1561 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1562 logging.warning(
1563 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1564 'cache (%.1f%% of its maximum capacity)',
1565 self._free_disk / 1024.,
1566 total_usage / 1024.,
1567 usage_percent)
1568 self._save()
1569
1570 def _path(self, digest):
1571 """Returns the path to one item."""
1572 return os.path.join(self.cache_dir, digest)
1573
1574 def _remove_lru_file(self):
1575 """Removes the last recently used file and returns its size."""
1576 self._lock.assert_locked()
1577 digest, size = self._lru.pop_oldest()
1578 self._delete_file(digest, size)
1579 return size
1580
1581 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1582 """Adds an item into LRU cache marking it as a newest one."""
1583 self._lock.assert_locked()
1584 if size == UNKNOWN_FILE_SIZE:
1585 size = os.stat(self._path(digest)).st_size
1586 self._added.append(size)
1587 self._lru.add(digest, size)
1588
1589 def _add_oldest_list(self, digests):
1590 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1591 self._lock.assert_locked()
1592 pairs = []
1593 for digest in digests:
1594 size = os.stat(self._path(digest)).st_size
1595 self._added.append(size)
1596 pairs.append((digest, size))
1597 self._lru.batch_insert_oldest(pairs)
1598
1599 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1600 """Deletes cache file from the file system."""
1601 self._lock.assert_locked()
1602 try:
1603 if size == UNKNOWN_FILE_SIZE:
1604 size = os.stat(self._path(digest)).st_size
1605 file_path.try_remove(self._path(digest))
1606 self._removed.append(size)
1607 except OSError as e:
1608 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1609
1610
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001611class IsolatedBundle(object):
1612 """Fetched and parsed .isolated file with all dependencies."""
1613
Vadim Shtayura3148e072014-09-02 18:51:52 -07001614 def __init__(self):
1615 self.command = []
1616 self.files = {}
1617 self.read_only = None
1618 self.relative_cwd = None
1619 # The main .isolated file, a IsolatedFile instance.
1620 self.root = None
1621
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001622 def fetch(self, fetch_queue, root_isolated_hash, algo):
1623 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001624
1625 It enables support for "included" .isolated files. They are processed in
1626 strict order but fetched asynchronously from the cache. This is important so
1627 that a file in an included .isolated file that is overridden by an embedding
1628 .isolated file is not fetched needlessly. The includes are fetched in one
1629 pass and the files are fetched as soon as all the ones on the left-side
1630 of the tree were fetched.
1631
1632 The prioritization is very important here for nested .isolated files.
1633 'includes' have the highest priority and the algorithm is optimized for both
1634 deep and wide trees. A deep one is a long link of .isolated files referenced
1635 one at a time by one item in 'includes'. A wide one has a large number of
1636 'includes' in a single .isolated file. 'left' is defined as an included
1637 .isolated file earlier in the 'includes' list. So the order of the elements
1638 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001639
1640 As a side effect this method starts asynchronous fetch of all data files
1641 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1642 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001643 """
1644 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1645
1646 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1647 pending = {}
1648 # Set of hashes of already retrieved items to refuse recursive includes.
1649 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001650 # Set of IsolatedFile's whose data files have already being fetched.
1651 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001652
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654 h = isolated_file.obj_hash
1655 if h in seen:
1656 raise isolated_format.IsolatedError(
1657 'IsolatedFile %s is retrieved recursively' % h)
1658 assert h not in pending
1659 seen.add(h)
1660 pending[h] = isolated_file
1661 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1662
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001663 # Start fetching root *.isolated file (single file, not the whole bundle).
1664 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001665
1666 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001667 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001668 item_hash = fetch_queue.wait(pending)
1669 item = pending.pop(item_hash)
1670 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001671
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001672 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001673 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001674 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001675
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001676 # Always fetch *.isolated files in traversal order, waiting if necessary
1677 # until next to-be-processed node loads. "Waiting" is done by yielding
1678 # back to the outer loop, that waits until some *.isolated is loaded.
1679 for node in isolated_format.walk_includes(self.root):
1680 if node not in processed:
1681 # Not visited, and not yet loaded -> wait for it to load.
1682 if not node.is_loaded:
1683 break
1684 # Not visited and loaded -> process it and continue the traversal.
1685 self._start_fetching_files(node, fetch_queue)
1686 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001687
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001688 # All *.isolated files should be processed by now and only them.
1689 all_isolateds = set(isolated_format.walk_includes(self.root))
1690 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001691
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001692 # Extract 'command' and other bundle properties.
1693 for node in isolated_format.walk_includes(self.root):
1694 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001695 self.relative_cwd = self.relative_cwd or ''
1696
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001697 def _start_fetching_files(self, isolated, fetch_queue):
1698 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001699
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001700 Modifies self.files.
1701 """
1702 logging.debug('fetch_files(%s)', isolated.obj_hash)
1703 for filepath, properties in isolated.data.get('files', {}).iteritems():
1704 # Root isolated has priority on the files being mapped. In particular,
1705 # overridden files must not be fetched.
1706 if filepath not in self.files:
1707 self.files[filepath] = properties
1708 if 'h' in properties:
1709 # Preemptively request files.
1710 logging.debug('fetching %s', filepath)
1711 fetch_queue.add(
1712 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1713
1714 def _update_self(self, node):
1715 """Extracts bundle global parameters from loaded *.isolated file.
1716
1717 Will be called with each loaded *.isolated file in order of traversal of
1718 isolated include graph (see isolated_format.walk_includes).
1719 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001720 # Grabs properties.
1721 if not self.command and node.data.get('command'):
1722 # Ensure paths are correctly separated on windows.
1723 self.command = node.data['command']
1724 if self.command:
1725 self.command[0] = self.command[0].replace('/', os.path.sep)
1726 self.command = tools.fix_python_path(self.command)
1727 if self.read_only is None and node.data.get('read_only') is not None:
1728 self.read_only = node.data['read_only']
1729 if (self.relative_cwd is None and
1730 node.data.get('relative_cwd') is not None):
1731 self.relative_cwd = node.data['relative_cwd']
1732
1733
Vadim Shtayura8623c272014-12-01 11:45:27 -08001734def set_storage_api_class(cls):
1735 """Replaces StorageApi implementation used by default."""
1736 global _storage_api_cls
1737 assert _storage_api_cls is None
1738 assert issubclass(cls, StorageApi)
1739 _storage_api_cls = cls
1740
1741
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001742def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001743 """Returns an object that implements low-level StorageApi interface.
1744
1745 It is used by Storage to work with single isolate |namespace|. It should
1746 rarely be used directly by clients, see 'get_storage' for
1747 a better alternative.
1748
1749 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001750 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001751 namespace: isolate namespace to operate in, also defines hashing and
1752 compression scheme used, i.e. namespace names that end with '-gzip'
1753 store compressed data.
1754
1755 Returns:
1756 Instance of StorageApi subclass.
1757 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001758 cls = _storage_api_cls or IsolateServer
1759 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001760
1761
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001762def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001763 """Returns Storage class that can upload and download from |namespace|.
1764
1765 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001766 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001767 namespace: isolate namespace to operate in, also defines hashing and
1768 compression scheme used, i.e. namespace names that end with '-gzip'
1769 store compressed data.
1770
1771 Returns:
1772 Instance of Storage.
1773 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001774 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001775
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001776
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001777def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001778 """Uploads the given tree to the given url.
1779
1780 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001781 base_url: The url of the isolate server to upload to.
1782 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001783 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001784 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001785 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001786 # Filter out symlinks, since they are not represented by items on isolate
1787 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001788 items = []
1789 seen = set()
1790 skipped = 0
1791 for filepath, metadata in infiles:
1792 if 'l' not in metadata and filepath not in seen:
1793 seen.add(filepath)
1794 item = FileItem(
1795 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001796 digest=metadata['h'],
1797 size=metadata['s'],
1798 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001799 items.append(item)
1800 else:
1801 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001802
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001803 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001804 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001805 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001806
1807
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001808def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001809 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001810
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001811 Arguments:
1812 isolated_hash: hash of the root *.isolated file.
1813 storage: Storage class that communicates with isolate storage.
1814 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001815 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 require_command: Ensure *.isolated specifies a command to run.
1817
1818 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001819 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001820 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001821 logging.debug(
1822 'fetch_isolated(%s, %s, %s, %s, %s)',
1823 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001824 # Hash algorithm to use, defined by namespace |storage| is using.
1825 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001826 with cache:
1827 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001828 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001829
1830 with tools.Profiler('GetIsolateds'):
1831 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001832 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001833 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1834 try:
1835 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1836 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001837 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001838 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1839 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001840
1841 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001842 bundle.fetch(fetch_queue, isolated_hash, algo)
1843 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001844 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1845 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001846 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001847
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001848 with tools.Profiler('GetRest'):
1849 # Create file system hierarchy.
1850 if not os.path.isdir(outdir):
1851 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001852 create_directories(outdir, bundle.files)
1853 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001854
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001855 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001856 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001857 if not os.path.isdir(cwd):
1858 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001859
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001860 # Multimap: digest -> list of pairs (path, props).
1861 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001862 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001863 if 'h' in props:
1864 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001865
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001866 # Now block on the remaining files to be downloaded and mapped.
1867 logging.info('Retrieving remaining files (%d of them)...',
1868 fetch_queue.pending_count)
1869 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001870 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001871 while remaining:
1872 detector.ping()
1873
1874 # Wait for any item to finish fetching to cache.
1875 digest = fetch_queue.wait(remaining)
1876
1877 # Link corresponding files to a fetched item in cache.
1878 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001879 cache.hardlink(
1880 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001881
1882 # Report progress.
1883 duration = time.time() - last_update
1884 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1885 msg = '%d files remaining...' % len(remaining)
1886 print msg
1887 logging.info(msg)
1888 last_update = time.time()
1889
1890 # Cache could evict some items we just tried to fetch, it's a fatal error.
1891 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001892 raise isolated_format.MappingError(
1893 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001894 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001895
1896
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001897def directory_to_metadata(root, algo, blacklist):
1898 """Returns the FileItem list and .isolated metadata for a directory."""
1899 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001900 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001901 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001902 metadata = {
1903 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001904 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001905 for relpath in paths
1906 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001907 for v in metadata.itervalues():
1908 v.pop('t')
1909 items = [
1910 FileItem(
1911 path=os.path.join(root, relpath),
1912 digest=meta['h'],
1913 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001914 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001915 for relpath, meta in metadata.iteritems() if 'h' in meta
1916 ]
1917 return items, metadata
1918
1919
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001920def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001921 """Stores every entries and returns the relevant data.
1922
1923 Arguments:
1924 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001925 files: list of file paths to upload. If a directory is specified, a
1926 .isolated file is created and its hash is returned.
1927 blacklist: function that returns True if a file should be omitted.
1928 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001929 assert all(isinstance(i, unicode) for i in files), files
1930 if len(files) != len(set(map(os.path.abspath, files))):
1931 raise Error('Duplicate entries found.')
1932
1933 results = []
1934 # The temporary directory is only created as needed.
1935 tempdir = None
1936 try:
1937 # TODO(maruel): Yield the files to a worker thread.
1938 items_to_upload = []
1939 for f in files:
1940 try:
1941 filepath = os.path.abspath(f)
1942 if os.path.isdir(filepath):
1943 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001944 items, metadata = directory_to_metadata(
1945 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001946
1947 # Create the .isolated file.
1948 if not tempdir:
1949 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1950 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1951 os.close(handle)
1952 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001953 'algo':
1954 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001955 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001956 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001957 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001958 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001959 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001960 items_to_upload.extend(items)
1961 items_to_upload.append(
1962 FileItem(
1963 path=isolated,
1964 digest=h,
1965 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001966 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001967 results.append((h, f))
1968
1969 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001970 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001971 items_to_upload.append(
1972 FileItem(
1973 path=filepath,
1974 digest=h,
1975 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001976 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001977 results.append((h, f))
1978 else:
1979 raise Error('%s is neither a file or directory.' % f)
1980 except OSError:
1981 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001982 # Technically we would care about which files were uploaded but we don't
1983 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001984 _uploaded_files = storage.upload_items(items_to_upload)
1985 return results
1986 finally:
1987 if tempdir:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001988 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001989
1990
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001991def archive(out, namespace, files, blacklist):
1992 if files == ['-']:
1993 files = sys.stdin.readlines()
1994
1995 if not files:
1996 raise Error('Nothing to upload')
1997
1998 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001999 blacklist = tools.gen_blacklist(blacklist)
2000 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002001 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002002 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2003
2004
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002005@subcommand.usage('<file1..fileN> or - to read from stdin')
2006def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002007 """Archives data to the server.
2008
2009 If a directory is specified, a .isolated file is created the whole directory
2010 is uploaded. Then this .isolated file can be included in another one to run
2011 commands.
2012
2013 The commands output each file that was processed with its content hash. For
2014 directories, the .isolated generated for the directory is listed as the
2015 directory entry itself.
2016 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002017 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002018 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002019 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002020 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002021 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002022 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002023 except Error as e:
2024 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002025 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002026
2027
2028def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002029 """Download data from the server.
2030
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002031 It can either download individual files or a complete tree from a .isolated
2032 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002033 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002034 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002035 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002036 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002037 help='hash of an isolated file, .isolated file content is discarded, use '
2038 '--file if you need it')
2039 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002040 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2041 help='hash and destination of a file, can be used multiple times')
2042 parser.add_option(
2043 '-t', '--target', metavar='DIR', default=os.getcwd(),
2044 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002045 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002046 options, args = parser.parse_args(args)
2047 if args:
2048 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002049
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002050 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002051 if bool(options.isolated) == bool(options.file):
2052 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002053
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002054 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002055 options.target = os.path.abspath(options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002056 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002057 # Fetching individual files.
2058 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002059 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002060 channel = threading_utils.TaskChannel()
2061 pending = {}
2062 for digest, dest in options.file:
2063 pending[digest] = dest
2064 storage.async_fetch(
2065 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002066 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002067 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002068 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002069 functools.partial(file_write, os.path.join(options.target, dest)))
2070 while pending:
2071 fetched = channel.pull()
2072 dest = pending.pop(fetched)
2073 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002074
Vadim Shtayura3172be52013-12-03 12:49:05 -08002075 # Fetching whole isolated tree.
2076 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002077 with cache:
2078 bundle = fetch_isolated(
2079 isolated_hash=options.isolated,
2080 storage=storage,
2081 cache=cache,
2082 outdir=options.target,
2083 require_command=False)
2084 if bundle.command:
2085 rel = os.path.join(options.target, bundle.relative_cwd)
2086 print('To run this test please run from the directory %s:' %
2087 os.path.join(options.target, rel))
2088 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002089
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002090 return 0
2091
2092
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002093def add_archive_options(parser):
2094 parser.add_option(
2095 '--blacklist',
2096 action='append', default=list(DEFAULT_BLACKLIST),
2097 help='List of regexp to use as blacklist filter when uploading '
2098 'directories')
2099
2100
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002101def add_isolate_server_options(parser):
2102 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002103 parser.add_option(
2104 '-I', '--isolate-server',
2105 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002106 help='URL of the Isolate Server to use. Defaults to the environment '
2107 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2108 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002109 parser.add_option(
2110 '--namespace', default='default-gzip',
2111 help='The namespace to use on the Isolate Server, default: %default')
2112
2113
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002114def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002115 """Processes the --isolate-server option and aborts if not specified.
2116
2117 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002118 """
2119 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002120 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002121 try:
2122 options.isolate_server = net.fix_url(options.isolate_server)
2123 except ValueError as e:
2124 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002125 if set_exception_handler:
2126 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002127 try:
2128 return auth.ensure_logged_in(options.isolate_server)
2129 except ValueError as e:
2130 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002131
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002132
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002133def add_cache_options(parser):
2134 cache_group = optparse.OptionGroup(parser, 'Cache management')
2135 cache_group.add_option(
2136 '--cache', metavar='DIR',
2137 help='Directory to keep a local cache of the files. Accelerates download '
2138 'by reusing already downloaded files. Default=%default')
2139 cache_group.add_option(
2140 '--max-cache-size',
2141 type='int',
2142 metavar='NNN',
2143 default=20*1024*1024*1024,
2144 help='Trim if the cache gets larger than this value, default=%default')
2145 cache_group.add_option(
2146 '--min-free-space',
2147 type='int',
2148 metavar='NNN',
2149 default=2*1024*1024*1024,
2150 help='Trim if disk free space becomes lower than this value, '
2151 'default=%default')
2152 cache_group.add_option(
2153 '--max-items',
2154 type='int',
2155 metavar='NNN',
2156 default=100000,
2157 help='Trim if more than this number of items are in the cache '
2158 'default=%default')
2159 parser.add_option_group(cache_group)
2160
2161
2162def process_cache_options(options):
2163 if options.cache:
2164 policies = CachePolicies(
2165 options.max_cache_size, options.min_free_space, options.max_items)
2166
2167 # |options.cache| path may not exist until DiskCache() instance is created.
2168 return DiskCache(
2169 os.path.abspath(options.cache),
2170 policies,
2171 isolated_format.get_hash_algo(options.namespace))
2172 else:
2173 return MemoryCache()
2174
2175
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002176class OptionParserIsolateServer(tools.OptionParserWithLogging):
2177 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002178 tools.OptionParserWithLogging.__init__(
2179 self,
2180 version=__version__,
2181 prog=os.path.basename(sys.modules[__name__].__file__),
2182 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002183 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002184
2185 def parse_args(self, *args, **kwargs):
2186 options, args = tools.OptionParserWithLogging.parse_args(
2187 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002188 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002189 return options, args
2190
2191
2192def main(args):
2193 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002194 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002195
2196
2197if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002198 fix_encoding.fix_encoding()
2199 tools.disable_buffering()
2200 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002201 sys.exit(main(sys.argv[1:]))