blob: e90a00482af0e497bd863489e69cb4466007e101 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Cory Massarocc19c8c2015-03-10 13:35:11 -07008__version__ = '0.4.3'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040031from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000032from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040033from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000034from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000035from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040038import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000040
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000041# Version of isolate protocol passed to the server in /handshake request.
42ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000043
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000044
Vadim Shtayura3148e072014-09-02 18:51:52 -070045# The file size to be used when we don't know the correct file size,
46# generally used for .isolated files.
47UNKNOWN_FILE_SIZE = None
48
49
50# Maximum expected delay (in seconds) between successive file fetches or uploads
51# in Storage. If it takes longer than that, a deadlock might be happening
52# and all stack frames for all threads are dumped to log.
53DEADLOCK_TIMEOUT = 5 * 60
54
55
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000056# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000057# All files are sorted by likelihood of a change in the file content
58# (currently file size is used to estimate this: larger the file -> larger the
59# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# and so on. Numbers here is a trade-off; the more per request, the lower the
62# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
63# larger values cause longer lookups, increasing the initial latency to start
64# uploading, which is especially an issue for large files. This value is
65# optimized for the "few thousands files to look up with minimal number of large
66# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040067ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000068
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000069
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000070# A list of already compressed extension types that should not receive any
71# compression before being uploaded.
72ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040073 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
74 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075]
76
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000077
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000078# Chunk size to use when reading from network stream.
79NET_IO_FILE_CHUNK = 16 * 1024
80
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000081
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000082# Read timeout in seconds for downloads from isolate storage. If there's no
83# response from the server within this timeout whole download will be aborted.
84DOWNLOAD_READ_TIMEOUT = 60
85
86
maruel@chromium.org41601642013-09-18 19:40:46 +000087# The delay (in seconds) to wait between logging statements when retrieving
88# the required files. This is intended to let the user (or buildbot) know that
89# the program is still running.
90DELAY_BETWEEN_UPDATES_IN_SECS = 30
91
92
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050093DEFAULT_BLACKLIST = (
94 # Temporary vim or python files.
95 r'^.+\.(?:pyc|swp)$',
96 # .git or .svn directory.
97 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
98)
99
100
Vadim Shtayura8623c272014-12-01 11:45:27 -0800101# A class to use to communicate with the server by default. Can be changed by
102# 'set_storage_api_class'. Default is IsolateServer.
103_storage_api_cls = None
104
105
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500106class Error(Exception):
107 """Generic runtime error."""
108 pass
109
110
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400111class Aborted(Error):
112 """Operation aborted."""
113 pass
114
115
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000116def stream_read(stream, chunk_size):
117 """Reads chunks from |stream| and yields them."""
118 while True:
119 data = stream.read(chunk_size)
120 if not data:
121 break
122 yield data
123
124
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400125def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000127 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 if offset:
129 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000131 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 if not data:
133 break
134 yield data
135
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137def file_write(filepath, content_generator):
138 """Writes file content as generated by content_generator.
139
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 Creates the intermediary directory as needed.
141
142 Returns the number of bytes written.
143
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 Meant to be mocked out in unit tests.
145 """
146 filedir = os.path.dirname(filepath)
147 if not os.path.isdir(filedir):
148 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000149 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000150 with open(filepath, 'wb') as f:
151 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000155
156
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000157def zip_compress(content_generator, level=7):
158 """Reads chunks from |content_generator| and yields zip compressed chunks."""
159 compressor = zlib.compressobj(level)
160 for chunk in content_generator:
161 compressed = compressor.compress(chunk)
162 if compressed:
163 yield compressed
164 tail = compressor.flush(zlib.Z_FINISH)
165 if tail:
166 yield tail
167
168
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400169def zip_decompress(
170 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000171 """Reads zipped data from |content_generator| and yields decompressed data.
172
173 Decompresses data in small chunks (no larger than |chunk_size|) so that
174 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
175
176 Raises IOError if data is corrupted or incomplete.
177 """
178 decompressor = zlib.decompressobj()
179 compressed_size = 0
180 try:
181 for chunk in content_generator:
182 compressed_size += len(chunk)
183 data = decompressor.decompress(chunk, chunk_size)
184 if data:
185 yield data
186 while decompressor.unconsumed_tail:
187 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
188 if data:
189 yield data
190 tail = decompressor.flush()
191 if tail:
192 yield tail
193 except zlib.error as e:
194 raise IOError(
195 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
196 # Ensure all data was read and decompressed.
197 if decompressor.unused_data or decompressor.unconsumed_tail:
198 raise IOError('Not all data was decompressed')
199
200
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000201def get_zip_compression_level(filename):
202 """Given a filename calculates the ideal zip compression level to use."""
203 file_ext = os.path.splitext(filename)[1].lower()
204 # TODO(csharp): Profile to find what compression level works best.
205 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
206
207
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000208def create_directories(base_directory, files):
209 """Creates the directory structure needed by the given list of files."""
210 logging.debug('create_directories(%s, %d)', base_directory, len(files))
211 # Creates the tree of directories to create.
212 directories = set(os.path.dirname(f) for f in files)
213 for item in list(directories):
214 while item:
215 directories.add(item)
216 item = os.path.dirname(item)
217 for d in sorted(directories):
218 if d:
219 os.mkdir(os.path.join(base_directory, d))
220
221
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500222def create_symlinks(base_directory, files):
223 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000224 for filepath, properties in files:
225 if 'l' not in properties:
226 continue
227 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500228 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000229 logging.warning('Ignoring symlink %s', filepath)
230 continue
231 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500232 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000234
235
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000236def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000237 """Determines if the given files appears valid.
238
239 Currently it just checks the file's size.
240 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700241 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000242 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000243 actual_size = os.stat(filepath).st_size
244 if size != actual_size:
245 logging.warning(
246 'Found invalid item %s; %d != %d',
247 os.path.basename(filepath), actual_size, size)
248 return False
249 return True
250
251
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000252class Item(object):
253 """An item to push to Storage.
254
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800255 Its digest and size may be provided in advance, if known. Otherwise they will
256 be derived from content(). If digest is provided, it MUST correspond to
257 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000258
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800259 When used with Storage, Item starts its life in a main thread, travels
260 to 'contains' thread, then to 'push' thread and then finally back to
261 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000262 """
263
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800264 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000265 self.digest = digest
266 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800267 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000269
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800270 def content(self):
271 """Iterable with content of this item as byte string (str) chunks."""
272 raise NotImplementedError()
273
274 def prepare(self, hash_algo):
275 """Ensures self.digest and self.size are set.
276
277 Uses content() as a source of data to calculate them. Does nothing if digest
278 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000279
280 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800281 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000282 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800283 if self.digest is None or self.size is None:
284 digest = hash_algo()
285 total = 0
286 for chunk in self.content():
287 digest.update(chunk)
288 total += len(chunk)
289 self.digest = digest.hexdigest()
290 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000291
292
293class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800294 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000295
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800296 Its digest and size may be provided in advance, if known. Otherwise they will
297 be derived from the file content.
298 """
299
300 def __init__(self, path, digest=None, size=None, high_priority=False):
301 super(FileItem, self).__init__(
302 digest,
303 size if size is not None else os.stat(path).st_size,
304 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000305 self.path = path
306 self.compression_level = get_zip_compression_level(path)
307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 def content(self):
309 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000310
311
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000312class BufferItem(Item):
313 """A byte buffer to push to Storage."""
314
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800315 def __init__(self, buf, high_priority=False):
316 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000317 self.buffer = buf
318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000320 return [self.buffer]
321
322
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000323class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000325
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 Implements compression support, parallel 'contains' checks, parallel uploads
327 and more.
328
329 Works only within single namespace (and thus hashing algorithm and compression
330 scheme are fixed).
331
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400332 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
333 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800334 """
335
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700336 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000337 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400338 self._use_zip = isolated_format.is_namespace_with_compression(
339 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400340 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000341 self._cpu_thread_pool = None
342 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400343 self._aborted = False
344 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000345
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000346 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700347 def hash_algo(self):
348 """Hashing algorithm used to name files in storage based on their content.
349
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400350 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700351 """
352 return self._hash_algo
353
354 @property
355 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500356 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700357 return self._storage_api.location
358
359 @property
360 def namespace(self):
361 """Isolate namespace used by this storage.
362
363 Indirectly defines hashing scheme and compression method used.
364 """
365 return self._storage_api.namespace
366
367 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000368 def cpu_thread_pool(self):
369 """ThreadPool for CPU-bound tasks like zipping."""
370 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500371 threads = max(threading_utils.num_processors(), 2)
372 if sys.maxsize <= 2L**32:
373 # On 32 bits userland, do not try to use more than 16 threads.
374 threads = min(threads, 16)
375 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000376 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000377
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 @property
379 def net_thread_pool(self):
380 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
381 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700382 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000384
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 def close(self):
386 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400387 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 if self._cpu_thread_pool:
389 self._cpu_thread_pool.join()
390 self._cpu_thread_pool.close()
391 self._cpu_thread_pool = None
392 if self._net_thread_pool:
393 self._net_thread_pool.join()
394 self._net_thread_pool.close()
395 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400396 logging.info('Done.')
397
398 def abort(self):
399 """Cancels any pending or future operations."""
400 # This is not strictly theadsafe, but in the worst case the logging message
401 # will be printed twice. Not a big deal. In other places it is assumed that
402 # unprotected reads and writes to _aborted are serializable (it is true
403 # for python) and thus no locking is used.
404 if not self._aborted:
405 logging.warning('Aborting... It can take a while.')
406 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000407
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000408 def __enter__(self):
409 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400410 assert not self._prev_sig_handlers, self._prev_sig_handlers
411 for s in (signal.SIGINT, signal.SIGTERM):
412 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000413 return self
414
415 def __exit__(self, _exc_type, _exc_value, _traceback):
416 """Context manager interface."""
417 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400418 while self._prev_sig_handlers:
419 s, h = self._prev_sig_handlers.popitem()
420 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000421 return False
422
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000423 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800424 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000425
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800426 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000427
428 Arguments:
429 items: list of Item instances that represents data to upload.
430
431 Returns:
432 List of items that were uploaded. All other items are already there.
433 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700434 logging.info('upload_items(items=%d)', len(items))
435
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800436 # Ensure all digests are calculated.
437 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700438 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800439
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000440 # For each digest keep only first Item that matches it. All other items
441 # are just indistinguishable copies from the point of view of isolate
442 # server (it doesn't care about paths at all, only content and digests).
443 seen = {}
444 duplicates = 0
445 for item in items:
446 if seen.setdefault(item.digest, item) is not item:
447 duplicates += 1
448 items = seen.values()
449 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700450 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000453 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000454 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800455 channel = threading_utils.TaskChannel()
456 for missing_item, push_state in self.get_missing_items(items):
457 missing.add(missing_item)
458 self.async_push(channel, missing_item, push_state)
459
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000460 # No need to spawn deadlock detector thread if there's nothing to upload.
461 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700462 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000464 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 detector.ping()
466 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000467 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000468 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000469 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000470 logging.info('All files are uploaded')
471
472 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000473 total = len(items)
474 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000475 logging.info(
476 'Total: %6d, %9.1fkb',
477 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000478 total_size / 1024.)
479 cache_hit = set(items) - missing
480 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 logging.info(
482 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
483 len(cache_hit),
484 cache_hit_size / 1024.,
485 len(cache_hit) * 100. / total,
486 cache_hit_size * 100. / total_size if total_size else 0)
487 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000488 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 logging.info(
490 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
491 len(cache_miss),
492 cache_miss_size / 1024.,
493 len(cache_miss) * 100. / total,
494 cache_miss_size * 100. / total_size if total_size else 0)
495
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496 return uploaded
497
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 def get_fetch_url(self, item):
499 """Returns an URL that can be used to fetch given item once it's uploaded.
500
501 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000502
503 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000505
506 Returns:
507 An URL or None if underlying protocol doesn't support this.
508 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700509 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000511
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000513 """Starts asynchronous push to the server in a parallel thread.
514
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 Can be used only after |item| was checked for presence on a server with
516 'get_missing_items' call. 'get_missing_items' returns |push_state| object
517 that contains storage specific information describing how to upload
518 the item (for example in case of cloud storage, it is signed upload URLs).
519
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000520 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000521 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000522 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800523 push_state: push state returned by 'get_missing_items' call for |item|.
524
525 Returns:
526 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400529 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700530 threading_utils.PRIORITY_HIGH if item.high_priority
531 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800532
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000533 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400534 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400535 if self._aborted:
536 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800538 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 return item
540
541 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700542 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800543 self.net_thread_pool.add_task_with_channel(
544 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000545 return
546
547 # If zipping is enabled, zip in a separate thread.
548 def zip_and_push():
549 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
550 # content right here. It will block until all file is zipped.
551 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400552 if self._aborted:
553 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800554 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 data = ''.join(stream)
556 except Exception as exc:
557 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800558 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000560 self.net_thread_pool.add_task_with_channel(
561 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000563
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800564 def push(self, item, push_state):
565 """Synchronously pushes a single item to the server.
566
567 If you need to push many items at once, consider using 'upload_items' or
568 'async_push' with instance of TaskChannel.
569
570 Arguments:
571 item: item to upload as instance of Item class.
572 push_state: push state returned by 'get_missing_items' call for |item|.
573
574 Returns:
575 Pushed item (same object as |item|).
576 """
577 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700578 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800579 self.async_push(channel, item, push_state)
580 pushed = channel.pull()
581 assert pushed is item
582 return item
583
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000584 def async_fetch(self, channel, priority, digest, size, sink):
585 """Starts asynchronous fetch from the server in a parallel thread.
586
587 Arguments:
588 channel: TaskChannel that receives back |digest| when download ends.
589 priority: thread pool task priority for the fetch.
590 digest: hex digest of an item to download.
591 size: expected size of the item (after decompression).
592 sink: function that will be called as sink(generator).
593 """
594 def fetch():
595 try:
596 # Prepare reading pipeline.
597 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700598 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400599 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000600 # Run |stream| through verifier that will assert its size.
601 verifier = FetchStreamVerifier(stream, size)
602 # Verified stream goes to |sink|.
603 sink(verifier.run())
604 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800605 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000606 raise
607 return digest
608
609 # Don't bother with zip_thread_pool for decompression. Decompression is
610 # really fast and most probably IO bound anyway.
611 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
612
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000613 def get_missing_items(self, items):
614 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000615
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000616 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000617
618 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000619 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
621 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800622 For each missing item it yields a pair (item, push_state), where:
623 * item - Item object that is missing (one of |items|).
624 * push_state - opaque object that contains storage specific information
625 describing how to upload the item (for example in case of cloud
626 storage, it is signed upload URLs). It can later be passed to
627 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000629 channel = threading_utils.TaskChannel()
630 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
632 # Ensure all digests are calculated.
633 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700634 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400636 def contains(batch):
637 if self._aborted:
638 raise Aborted()
639 return self._storage_api.contains(batch)
640
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400643 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400644 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000645 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 # Yield results as they come in.
648 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649 for missing_item, push_state in channel.pull().iteritems():
650 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653def batch_items_for_check(items):
654 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000655
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800656 Each batch corresponds to a single 'exists?' query to the server via a call
657 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000658
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800659 Arguments:
660 items: a list of Item objects.
661
662 Yields:
663 Batches of items to query for existence in a single operation,
664 each batch is a list of Item objects.
665 """
666 batch_count = 0
667 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
668 next_queries = []
669 for item in sorted(items, key=lambda x: x.size, reverse=True):
670 next_queries.append(item)
671 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000672 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800673 next_queries = []
674 batch_count += 1
675 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
676 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
677 if next_queries:
678 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000679
680
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000681class FetchQueue(object):
682 """Fetches items from Storage and places them into LocalCache.
683
684 It manages multiple concurrent fetch operations. Acts as a bridge between
685 Storage and LocalCache so that Storage and LocalCache don't depend on each
686 other at all.
687 """
688
689 def __init__(self, storage, cache):
690 self.storage = storage
691 self.cache = cache
692 self._channel = threading_utils.TaskChannel()
693 self._pending = set()
694 self._accessed = set()
695 self._fetched = cache.cached_set()
696
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400697 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700698 self,
699 digest,
700 size=UNKNOWN_FILE_SIZE,
701 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000702 """Starts asynchronous fetch of item |digest|."""
703 # Fetching it now?
704 if digest in self._pending:
705 return
706
707 # Mark this file as in use, verify_all_cached will later ensure it is still
708 # in cache.
709 self._accessed.add(digest)
710
711 # Already fetched? Notify cache to update item's LRU position.
712 if digest in self._fetched:
713 # 'touch' returns True if item is in cache and not corrupted.
714 if self.cache.touch(digest, size):
715 return
716 # Item is corrupted, remove it from cache and fetch it again.
717 self._fetched.remove(digest)
718 self.cache.evict(digest)
719
720 # TODO(maruel): It should look at the free disk space, the current cache
721 # size and the size of the new item on every new item:
722 # - Trim the cache as more entries are listed when free disk space is low,
723 # otherwise if the amount of data downloaded during the run > free disk
724 # space, it'll crash.
725 # - Make sure there's enough free disk space to fit all dependencies of
726 # this run! If not, abort early.
727
728 # Start fetching.
729 self._pending.add(digest)
730 self.storage.async_fetch(
731 self._channel, priority, digest, size,
732 functools.partial(self.cache.write, digest))
733
734 def wait(self, digests):
735 """Starts a loop that waits for at least one of |digests| to be retrieved.
736
737 Returns the first digest retrieved.
738 """
739 # Flush any already fetched items.
740 for digest in digests:
741 if digest in self._fetched:
742 return digest
743
744 # Ensure all requested items are being fetched now.
745 assert all(digest in self._pending for digest in digests), (
746 digests, self._pending)
747
748 # Wait for some requested item to finish fetching.
749 while self._pending:
750 digest = self._channel.pull()
751 self._pending.remove(digest)
752 self._fetched.add(digest)
753 if digest in digests:
754 return digest
755
756 # Should never reach this point due to assert above.
757 raise RuntimeError('Impossible state')
758
759 def inject_local_file(self, path, algo):
760 """Adds local file to the cache as if it was fetched from storage."""
761 with open(path, 'rb') as f:
762 data = f.read()
763 digest = algo(data).hexdigest()
764 self.cache.write(digest, [data])
765 self._fetched.add(digest)
766 return digest
767
768 @property
769 def pending_count(self):
770 """Returns number of items to be fetched."""
771 return len(self._pending)
772
773 def verify_all_cached(self):
774 """True if all accessed items are in cache."""
775 return self._accessed.issubset(self.cache.cached_set())
776
777
778class FetchStreamVerifier(object):
779 """Verifies that fetched file is valid before passing it to the LocalCache."""
780
781 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400782 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000783 self.stream = stream
784 self.expected_size = expected_size
785 self.current_size = 0
786
787 def run(self):
788 """Generator that yields same items as |stream|.
789
790 Verifies |stream| is complete before yielding a last chunk to consumer.
791
792 Also wraps IOError produced by consumer into MappingError exceptions since
793 otherwise Storage will retry fetch on unrelated local cache errors.
794 """
795 # Read one chunk ahead, keep it in |stored|.
796 # That way a complete stream can be verified before pushing last chunk
797 # to consumer.
798 stored = None
799 for chunk in self.stream:
800 assert chunk is not None
801 if stored is not None:
802 self._inspect_chunk(stored, is_last=False)
803 try:
804 yield stored
805 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400806 raise isolated_format.MappingError(
807 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000808 stored = chunk
809 if stored is not None:
810 self._inspect_chunk(stored, is_last=True)
811 try:
812 yield stored
813 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400814 raise isolated_format.MappingError(
815 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000816
817 def _inspect_chunk(self, chunk, is_last):
818 """Called for each fetched chunk before passing it to consumer."""
819 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400820 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700821 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 (self.expected_size != self.current_size)):
823 raise IOError('Incorrect file size: expected %d, got %d' % (
824 self.expected_size, self.current_size))
825
826
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000827class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800828 """Interface for classes that implement low-level storage operations.
829
830 StorageApi is oblivious of compression and hashing scheme used. This details
831 are handled in higher level Storage class.
832
833 Clients should generally not use StorageApi directly. Storage class is
834 preferred since it implements compression and upload optimizations.
835 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000836
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700837 @property
838 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500839 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700840 raise NotImplementedError()
841
842 @property
843 def namespace(self):
844 """Isolate namespace used by this storage.
845
846 Indirectly defines hashing scheme and compression method used.
847 """
848 raise NotImplementedError()
849
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000850 def get_fetch_url(self, digest):
851 """Returns an URL that can be used to fetch an item with given digest.
852
853 Arguments:
854 digest: hex digest of item to fetch.
855
856 Returns:
857 An URL or None if the protocol doesn't support this.
858 """
859 raise NotImplementedError()
860
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800861 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000862 """Fetches an object and yields its content.
863
864 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000865 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800866 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000867
868 Yields:
869 Chunks of downloaded item (as str objects).
870 """
871 raise NotImplementedError()
872
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800873 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000874 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000875
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800876 |item| MUST go through 'contains' call to get |push_state| before it can
877 be pushed to the storage.
878
879 To be clear, here is one possible usage:
880 all_items = [... all items to push as Item subclasses ...]
881 for missing_item, push_state in storage_api.contains(all_items).items():
882 storage_api.push(missing_item, push_state)
883
884 When pushing to a namespace with compression, data that should be pushed
885 and data provided by the item is not the same. In that case |content| is
886 not None and it yields chunks of compressed data (using item.content() as
887 a source of original uncompressed data). This is implemented by Storage
888 class.
889
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000890 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000891 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800892 push_state: push state object as returned by 'contains' call.
893 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000894
895 Returns:
896 None.
897 """
898 raise NotImplementedError()
899
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000900 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800901 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000902
903 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800904 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000905
906 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800907 A dict missing Item -> opaque push state object to be passed to 'push'.
908 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000909 """
910 raise NotImplementedError()
911
912
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800913class _IsolateServerPushState(object):
914 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500915
916 Note this needs to be a global class to support pickling.
917 """
918
Cory Massarocc19c8c2015-03-10 13:35:11 -0700919 def __init__(self, preupload_status, size):
920 self.preupload_status = preupload_status
921 gs_upload_url = preupload_status.get('gs_upload_url') or None
922 if gs_upload_url:
923 self.upload_url = gs_upload_url
924 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
925 else:
926 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
927 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500928 self.uploaded = False
929 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500930 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500931
932
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000933class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000934 """StorageApi implementation that downloads and uploads to Isolate Server.
935
936 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800937 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000938 """
939
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000940 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000941 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500942 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700943 self._base_url = base_url.rstrip('/')
944 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700945 self._namespace_dict = {
946 'compression': 'flate' if namespace.endswith(
947 ('-gzip', '-flate')) else '',
948 'digest_hash': 'sha-1',
949 'namespace': namespace,
950 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000951 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000952 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500953 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000954
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000955 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000956 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700957 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000958
959 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700960 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000961 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000962 # TODO(maruel): Make this request much earlier asynchronously while the
963 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800964
965 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
966 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700967
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000968 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000969 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700970 self._server_caps = net.url_read_json(
971 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
972 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000973 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000974
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700975 @property
976 def location(self):
977 return self._base_url
978
979 @property
980 def namespace(self):
981 return self._namespace
982
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000983 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000984 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000985 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700986 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000987
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800988 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700989 assert offset >= 0
990 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
991 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800992 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700993 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000994
Cory Massarocc19c8c2015-03-10 13:35:11 -0700995 if not response:
996 raise IOError('Attempted to fetch from %s; no data exist.' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800997
Cory Massarocc19c8c2015-03-10 13:35:11 -0700998 # for DB uploads
999 content = response.get('content')
1000 if content is not None:
1001 return base64.b64decode(content)
1002
1003 # for GS entities
1004 connection = net.url_open(response['url'])
1005
1006 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001007 if offset:
1008 content_range = connection.get_header('Content-Range')
1009 if not content_range:
1010 raise IOError('Missing Content-Range header')
1011
1012 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1013 # According to a spec, <size> can be '*' meaning "Total size of the file
1014 # is not known in advance".
1015 try:
1016 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1017 if not match:
1018 raise ValueError()
1019 content_offset = int(match.group(1))
1020 last_byte_index = int(match.group(2))
1021 size = None if match.group(3) == '*' else int(match.group(3))
1022 except ValueError:
1023 raise IOError('Invalid Content-Range header: %s' % content_range)
1024
1025 # Ensure returned offset equals requested one.
1026 if offset != content_offset:
1027 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1028 offset, content_offset, content_range))
1029
1030 # Ensure entire tail of the file is returned.
1031 if size is not None and last_byte_index + 1 != size:
1032 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1033
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001034 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001035
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001036 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001037 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001038 assert item.digest is not None
1039 assert item.size is not None
1040 assert isinstance(push_state, _IsolateServerPushState)
1041 assert not push_state.finalized
1042
1043 # Default to item.content().
1044 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001045 logging.info('Push state size: %d', push_state.size)
1046 if isinstance(content, (basestring, list)):
1047 # Memory is already used, too late.
1048 with self._lock:
1049 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001050 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001051 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1052 # If |content| is indeed a generator, it can not be re-winded back to the
1053 # beginning of the stream. A retry will find it exhausted. A possible
1054 # solution is to wrap |content| generator with some sort of caching
1055 # restartable generator. It should be done alongside streaming support
1056 # implementation.
1057 #
1058 # In theory, we should keep the generator, so that it is not serialized in
1059 # memory. Sadly net.HttpService.request() requires the body to be
1060 # serialized.
1061 assert isinstance(content, types.GeneratorType), repr(content)
1062 slept = False
1063 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001064 # One byte less than 512mb. This is to cope with incompressible content.
1065 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001066 while True:
1067 with self._lock:
1068 # This is due to 32 bits python when uploading very large files. The
1069 # problem is that it's comparing uncompressed sizes, while we care
1070 # about compressed sizes since it's what is serialized in memory.
1071 # The first check assumes large files are compressible and that by
1072 # throttling one upload at once, we can survive. Otherwise, kaboom.
1073 memory_use = self._memory_use
1074 if ((push_state.size >= max_size and not memory_use) or
1075 (memory_use + push_state.size <= max_size)):
1076 self._memory_use += push_state.size
1077 memory_use = self._memory_use
1078 break
1079 time.sleep(0.1)
1080 slept = True
1081 if slept:
1082 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001083
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001084 try:
1085 # This push operation may be a retry after failed finalization call below,
1086 # no need to reupload contents in that case.
1087 if not push_state.uploaded:
1088 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001089 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001090 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001091 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001092 item.digest, push_state.upload_url))
1093 push_state.uploaded = True
1094 else:
1095 logging.info(
1096 'A file %s already uploaded, retrying finalization only',
1097 item.digest)
1098
1099 # Optionally notify the server that it's done.
1100 if push_state.finalize_url:
1101 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1102 # send it to isolated server. That way isolate server can verify that
1103 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1104 # stored files).
1105 # TODO(maruel): Fix the server to accept properly data={} so
1106 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001107 response = net.url_read_json(
1108 url='%s/%s' % (self._base_url, push_state.finalize_url),
1109 data={
1110 'upload_ticket': push_state.preupload_status['upload_ticket'],
1111 })
1112 if not response or not response['ok']:
1113 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001114 push_state.finalized = True
1115 finally:
1116 with self._lock:
1117 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001118
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001119 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001120 # Ensure all items were initialized with 'prepare' call. Storage does that.
1121 assert all(i.digest is not None and i.size is not None for i in items)
1122
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001123 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001124 body = {
1125 'items': [
1126 {
1127 'digest': item.digest,
1128 'is_isolated': bool(item.high_priority),
1129 'size': item.size,
1130 } for item in items
1131 ],
1132 'namespace': self._namespace_dict,
1133 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001134
Cory Massarocc19c8c2015-03-10 13:35:11 -07001135 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001136
1137 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001138 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001139 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001140 response = net.url_read_json(url=query_url, data=body)
1141 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001142 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001143 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001144 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001145 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001146 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001147
1148 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001149 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001150 for preupload_status in response.get('items', []):
1151 assert 'upload_ticket' in preupload_status, (
1152 preupload_status, '/preupload did not generate an upload ticket')
1153 index = int(preupload_status['index'])
1154 missing_items[items[index]] = _IsolateServerPushState(
1155 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001156 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001157 len(items), len(items) - len(missing_items))
1158 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001159
Cory Massarocc19c8c2015-03-10 13:35:11 -07001160 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001161 """Fetches isolated data from the URL.
1162
1163 Used only for fetching files, not for API calls. Can be overridden in
1164 subclasses.
1165
1166 Args:
1167 url: URL to fetch the data from, can possibly return http redirect.
1168 offset: byte offset inside the file to start fetching from.
1169
1170 Returns:
1171 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1172 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001173 assert isinstance(offset, int)
1174 data = {
1175 'digest': digest.encode('utf-8'),
1176 'namespace': self._namespace_dict,
1177 'offset': offset,
1178 }
1179 return net.url_read_json(
1180 url=url,
1181 data=data,
1182 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001183
Cory Massarocc19c8c2015-03-10 13:35:11 -07001184 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001185 """Uploads isolated file to the URL.
1186
1187 Used only for storing files, not for API calls. Can be overridden in
1188 subclasses.
1189
1190 Args:
1191 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001192 push_state: an _IsolateServicePushState instance
1193 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001194 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001195 """
1196 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1197 # upload support is implemented.
1198 if isinstance(content, list) and len(content) == 1:
1199 content = content[0]
1200 else:
1201 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001202
1203 # DB upload
1204 if not push_state.finalize_url:
1205 url = '%s/%s' % (self._base_url, push_state.upload_url)
1206 content = base64.b64encode(content)
1207 data = {
1208 'upload_ticket': push_state.preupload_status['upload_ticket'],
1209 'content': content,
1210 }
1211 response = net.url_read_json(url=url, data=data)
1212 return response is not None and response['ok']
1213
1214 # upload to GS
1215 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001216 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001217 content_type='application/octet-stream',
1218 data=content,
1219 method='PUT',
1220 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001221 return response is not None
1222
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001223
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001224class LocalCache(object):
1225 """Local cache that stores objects fetched via Storage.
1226
1227 It can be accessed concurrently from multiple threads, so it should protect
1228 its internal state with some lock.
1229 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001230 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001231
1232 def __enter__(self):
1233 """Context manager interface."""
1234 return self
1235
1236 def __exit__(self, _exc_type, _exec_value, _traceback):
1237 """Context manager interface."""
1238 return False
1239
1240 def cached_set(self):
1241 """Returns a set of all cached digests (always a new object)."""
1242 raise NotImplementedError()
1243
1244 def touch(self, digest, size):
1245 """Ensures item is not corrupted and updates its LRU position.
1246
1247 Arguments:
1248 digest: hash digest of item to check.
1249 size: expected size of this item.
1250
1251 Returns:
1252 True if item is in cache and not corrupted.
1253 """
1254 raise NotImplementedError()
1255
1256 def evict(self, digest):
1257 """Removes item from cache if it's there."""
1258 raise NotImplementedError()
1259
1260 def read(self, digest):
1261 """Returns contents of the cached item as a single str."""
1262 raise NotImplementedError()
1263
1264 def write(self, digest, content):
1265 """Reads data from |content| generator and stores it in cache."""
1266 raise NotImplementedError()
1267
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001268 def hardlink(self, digest, dest, file_mode):
1269 """Ensures file at |dest| has same content as cached |digest|.
1270
1271 If file_mode is provided, it is used to set the executable bit if
1272 applicable.
1273 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001274 raise NotImplementedError()
1275
1276
1277class MemoryCache(LocalCache):
1278 """LocalCache implementation that stores everything in memory."""
1279
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001280 def __init__(self, file_mode_mask=0500):
1281 """Args:
1282 file_mode_mask: bit mask to AND file mode with. Default value will make
1283 all mapped files to be read only.
1284 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001285 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001286 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001287 # Let's not assume dict is thread safe.
1288 self._lock = threading.Lock()
1289 self._contents = {}
1290
1291 def cached_set(self):
1292 with self._lock:
1293 return set(self._contents)
1294
1295 def touch(self, digest, size):
1296 with self._lock:
1297 return digest in self._contents
1298
1299 def evict(self, digest):
1300 with self._lock:
1301 self._contents.pop(digest, None)
1302
1303 def read(self, digest):
1304 with self._lock:
1305 return self._contents[digest]
1306
1307 def write(self, digest, content):
1308 # Assemble whole stream before taking the lock.
1309 data = ''.join(content)
1310 with self._lock:
1311 self._contents[digest] = data
1312
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001313 def hardlink(self, digest, dest, file_mode):
1314 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001315 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001316 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001317 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001318
1319
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001320class CachePolicies(object):
1321 def __init__(self, max_cache_size, min_free_space, max_items):
1322 """
1323 Arguments:
1324 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1325 cache is effectively a leak.
1326 - min_free_space: Trim if disk free space becomes lower than this value. If
1327 0, it unconditionally fill the disk.
1328 - max_items: Maximum number of items to keep in the cache. If 0, do not
1329 enforce a limit.
1330 """
1331 self.max_cache_size = max_cache_size
1332 self.min_free_space = min_free_space
1333 self.max_items = max_items
1334
1335
1336class DiskCache(LocalCache):
1337 """Stateful LRU cache in a flat hash table in a directory.
1338
1339 Saves its state as json file.
1340 """
1341 STATE_FILE = 'state.json'
1342
1343 def __init__(self, cache_dir, policies, hash_algo):
1344 """
1345 Arguments:
1346 cache_dir: directory where to place the cache.
1347 policies: cache retention policies.
1348 algo: hashing algorithm used.
1349 """
1350 super(DiskCache, self).__init__()
1351 self.cache_dir = cache_dir
1352 self.policies = policies
1353 self.hash_algo = hash_algo
1354 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1355
1356 # All protected methods (starting with '_') except _path should be called
1357 # with this lock locked.
1358 self._lock = threading_utils.LockWithAssert()
1359 self._lru = lru.LRUDict()
1360
1361 # Profiling values.
1362 self._added = []
1363 self._removed = []
1364 self._free_disk = 0
1365
1366 with tools.Profiler('Setup'):
1367 with self._lock:
1368 self._load()
1369
1370 def __enter__(self):
1371 return self
1372
1373 def __exit__(self, _exc_type, _exec_value, _traceback):
1374 with tools.Profiler('CleanupTrimming'):
1375 with self._lock:
1376 self._trim()
1377
1378 logging.info(
1379 '%5d (%8dkb) added',
1380 len(self._added), sum(self._added) / 1024)
1381 logging.info(
1382 '%5d (%8dkb) current',
1383 len(self._lru),
1384 sum(self._lru.itervalues()) / 1024)
1385 logging.info(
1386 '%5d (%8dkb) removed',
1387 len(self._removed), sum(self._removed) / 1024)
1388 logging.info(
1389 ' %8dkb free',
1390 self._free_disk / 1024)
1391 return False
1392
1393 def cached_set(self):
1394 with self._lock:
1395 return self._lru.keys_set()
1396
1397 def touch(self, digest, size):
1398 """Verifies an actual file is valid.
1399
1400 Note that is doesn't compute the hash so it could still be corrupted if the
1401 file size didn't change.
1402
1403 TODO(maruel): More stringent verification while keeping the check fast.
1404 """
1405 # Do the check outside the lock.
1406 if not is_valid_file(self._path(digest), size):
1407 return False
1408
1409 # Update it's LRU position.
1410 with self._lock:
1411 if digest not in self._lru:
1412 return False
1413 self._lru.touch(digest)
1414 return True
1415
1416 def evict(self, digest):
1417 with self._lock:
1418 self._lru.pop(digest)
1419 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1420
1421 def read(self, digest):
1422 with open(self._path(digest), 'rb') as f:
1423 return f.read()
1424
1425 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001426 assert content is not None
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001427 path = self._path(digest)
1428 # A stale broken file may remain. It is possible for the file to have write
1429 # access bit removed which would cause the file_write() call to fail to open
1430 # in write mode. Take no chance here.
1431 file_path.try_remove(path)
1432 try:
1433 size = file_write(path, content)
1434 except:
1435 # There are two possible places were an exception can occur:
1436 # 1) Inside |content| generator in case of network or unzipping errors.
1437 # 2) Inside file_write itself in case of disk IO errors.
1438 # In any case delete an incomplete file and propagate the exception to
1439 # caller, it will be logged there.
1440 file_path.try_remove(path)
1441 raise
1442 # Make the file read-only in the cache. This has a few side-effects since
1443 # the file node is modified, so every directory entries to this file becomes
1444 # read-only. It's fine here because it is a new file.
1445 file_path.set_read_only(path, True)
1446 with self._lock:
1447 self._add(digest, size)
1448
1449 def hardlink(self, digest, dest, file_mode):
1450 """Hardlinks the file to |dest|.
1451
1452 Note that the file permission bits are on the file node, not the directory
1453 entry, so changing the access bit on any of the directory entries for the
1454 file node will affect them all.
1455 """
1456 path = self._path(digest)
1457 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1458 file_path.hardlink(path, dest)
1459 if file_mode is not None:
1460 # Ignores all other bits.
1461 os.chmod(dest, file_mode & 0500)
1462
1463 def _load(self):
1464 """Loads state of the cache from json file."""
1465 self._lock.assert_locked()
1466
1467 if not os.path.isdir(self.cache_dir):
1468 os.makedirs(self.cache_dir)
1469 else:
1470 # Make sure the cache is read-only.
1471 # TODO(maruel): Calculate the cost and optimize the performance
1472 # accordingly.
1473 file_path.make_tree_read_only(self.cache_dir)
1474
1475 # Load state of the cache.
1476 if os.path.isfile(self.state_file):
1477 try:
1478 self._lru = lru.LRUDict.load(self.state_file)
1479 except ValueError as err:
1480 logging.error('Failed to load cache state: %s' % (err,))
1481 # Don't want to keep broken state file.
1482 file_path.try_remove(self.state_file)
1483
1484 # Ensure that all files listed in the state still exist and add new ones.
1485 previous = self._lru.keys_set()
1486 unknown = []
1487 for filename in os.listdir(self.cache_dir):
1488 if filename == self.STATE_FILE:
1489 continue
1490 if filename in previous:
1491 previous.remove(filename)
1492 continue
1493 # An untracked file.
1494 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1495 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001496 p = self._path(filename)
1497 if os.path.isdir(p):
1498 try:
1499 file_path.rmtree(p)
1500 except OSError:
1501 pass
1502 else:
1503 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001504 continue
1505 # File that's not referenced in 'state.json'.
1506 # TODO(vadimsh): Verify its SHA1 matches file name.
1507 logging.warning('Adding unknown file %s to cache', filename)
1508 unknown.append(filename)
1509
1510 if unknown:
1511 # Add as oldest files. They will be deleted eventually if not accessed.
1512 self._add_oldest_list(unknown)
1513 logging.warning('Added back %d unknown files', len(unknown))
1514
1515 if previous:
1516 # Filter out entries that were not found.
1517 logging.warning('Removed %d lost files', len(previous))
1518 for filename in previous:
1519 self._lru.pop(filename)
1520 self._trim()
1521
1522 def _save(self):
1523 """Saves the LRU ordering."""
1524 self._lock.assert_locked()
1525 if sys.platform != 'win32':
1526 d = os.path.dirname(self.state_file)
1527 if os.path.isdir(d):
1528 # Necessary otherwise the file can't be created.
1529 file_path.set_read_only(d, False)
1530 if os.path.isfile(self.state_file):
1531 file_path.set_read_only(self.state_file, False)
1532 self._lru.save(self.state_file)
1533
1534 def _trim(self):
1535 """Trims anything we don't know, make sure enough free space exists."""
1536 self._lock.assert_locked()
1537
1538 # Ensure maximum cache size.
1539 if self.policies.max_cache_size:
1540 total_size = sum(self._lru.itervalues())
1541 while total_size > self.policies.max_cache_size:
1542 total_size -= self._remove_lru_file()
1543
1544 # Ensure maximum number of items in the cache.
1545 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1546 for _ in xrange(len(self._lru) - self.policies.max_items):
1547 self._remove_lru_file()
1548
1549 # Ensure enough free space.
1550 self._free_disk = file_path.get_free_space(self.cache_dir)
1551 trimmed_due_to_space = False
1552 while (
1553 self.policies.min_free_space and
1554 self._lru and
1555 self._free_disk < self.policies.min_free_space):
1556 trimmed_due_to_space = True
1557 self._remove_lru_file()
1558 self._free_disk = file_path.get_free_space(self.cache_dir)
1559 if trimmed_due_to_space:
1560 total_usage = sum(self._lru.itervalues())
1561 usage_percent = 0.
1562 if total_usage:
1563 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1564 logging.warning(
1565 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1566 'cache (%.1f%% of its maximum capacity)',
1567 self._free_disk / 1024.,
1568 total_usage / 1024.,
1569 usage_percent)
1570 self._save()
1571
1572 def _path(self, digest):
1573 """Returns the path to one item."""
1574 return os.path.join(self.cache_dir, digest)
1575
1576 def _remove_lru_file(self):
1577 """Removes the last recently used file and returns its size."""
1578 self._lock.assert_locked()
1579 digest, size = self._lru.pop_oldest()
1580 self._delete_file(digest, size)
1581 return size
1582
1583 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1584 """Adds an item into LRU cache marking it as a newest one."""
1585 self._lock.assert_locked()
1586 if size == UNKNOWN_FILE_SIZE:
1587 size = os.stat(self._path(digest)).st_size
1588 self._added.append(size)
1589 self._lru.add(digest, size)
1590
1591 def _add_oldest_list(self, digests):
1592 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1593 self._lock.assert_locked()
1594 pairs = []
1595 for digest in digests:
1596 size = os.stat(self._path(digest)).st_size
1597 self._added.append(size)
1598 pairs.append((digest, size))
1599 self._lru.batch_insert_oldest(pairs)
1600
1601 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1602 """Deletes cache file from the file system."""
1603 self._lock.assert_locked()
1604 try:
1605 if size == UNKNOWN_FILE_SIZE:
1606 size = os.stat(self._path(digest)).st_size
1607 file_path.try_remove(self._path(digest))
1608 self._removed.append(size)
1609 except OSError as e:
1610 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1611
1612
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001613class IsolatedBundle(object):
1614 """Fetched and parsed .isolated file with all dependencies."""
1615
Vadim Shtayura3148e072014-09-02 18:51:52 -07001616 def __init__(self):
1617 self.command = []
1618 self.files = {}
1619 self.read_only = None
1620 self.relative_cwd = None
1621 # The main .isolated file, a IsolatedFile instance.
1622 self.root = None
1623
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001624 def fetch(self, fetch_queue, root_isolated_hash, algo):
1625 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001626
1627 It enables support for "included" .isolated files. They are processed in
1628 strict order but fetched asynchronously from the cache. This is important so
1629 that a file in an included .isolated file that is overridden by an embedding
1630 .isolated file is not fetched needlessly. The includes are fetched in one
1631 pass and the files are fetched as soon as all the ones on the left-side
1632 of the tree were fetched.
1633
1634 The prioritization is very important here for nested .isolated files.
1635 'includes' have the highest priority and the algorithm is optimized for both
1636 deep and wide trees. A deep one is a long link of .isolated files referenced
1637 one at a time by one item in 'includes'. A wide one has a large number of
1638 'includes' in a single .isolated file. 'left' is defined as an included
1639 .isolated file earlier in the 'includes' list. So the order of the elements
1640 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001641
1642 As a side effect this method starts asynchronous fetch of all data files
1643 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1644 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001645 """
1646 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1647
1648 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1649 pending = {}
1650 # Set of hashes of already retrieved items to refuse recursive includes.
1651 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001652 # Set of IsolatedFile's whose data files have already being fetched.
1653 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001655 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001656 h = isolated_file.obj_hash
1657 if h in seen:
1658 raise isolated_format.IsolatedError(
1659 'IsolatedFile %s is retrieved recursively' % h)
1660 assert h not in pending
1661 seen.add(h)
1662 pending[h] = isolated_file
1663 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1664
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001665 # Start fetching root *.isolated file (single file, not the whole bundle).
1666 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001667
1668 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001669 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001670 item_hash = fetch_queue.wait(pending)
1671 item = pending.pop(item_hash)
1672 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001673
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001674 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001675 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001676 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001677
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001678 # Always fetch *.isolated files in traversal order, waiting if necessary
1679 # until next to-be-processed node loads. "Waiting" is done by yielding
1680 # back to the outer loop, that waits until some *.isolated is loaded.
1681 for node in isolated_format.walk_includes(self.root):
1682 if node not in processed:
1683 # Not visited, and not yet loaded -> wait for it to load.
1684 if not node.is_loaded:
1685 break
1686 # Not visited and loaded -> process it and continue the traversal.
1687 self._start_fetching_files(node, fetch_queue)
1688 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001689
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001690 # All *.isolated files should be processed by now and only them.
1691 all_isolateds = set(isolated_format.walk_includes(self.root))
1692 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001693
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001694 # Extract 'command' and other bundle properties.
1695 for node in isolated_format.walk_includes(self.root):
1696 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001697 self.relative_cwd = self.relative_cwd or ''
1698
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001699 def _start_fetching_files(self, isolated, fetch_queue):
1700 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001701
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001702 Modifies self.files.
1703 """
1704 logging.debug('fetch_files(%s)', isolated.obj_hash)
1705 for filepath, properties in isolated.data.get('files', {}).iteritems():
1706 # Root isolated has priority on the files being mapped. In particular,
1707 # overridden files must not be fetched.
1708 if filepath not in self.files:
1709 self.files[filepath] = properties
1710 if 'h' in properties:
1711 # Preemptively request files.
1712 logging.debug('fetching %s', filepath)
1713 fetch_queue.add(
1714 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1715
1716 def _update_self(self, node):
1717 """Extracts bundle global parameters from loaded *.isolated file.
1718
1719 Will be called with each loaded *.isolated file in order of traversal of
1720 isolated include graph (see isolated_format.walk_includes).
1721 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001722 # Grabs properties.
1723 if not self.command and node.data.get('command'):
1724 # Ensure paths are correctly separated on windows.
1725 self.command = node.data['command']
1726 if self.command:
1727 self.command[0] = self.command[0].replace('/', os.path.sep)
1728 self.command = tools.fix_python_path(self.command)
1729 if self.read_only is None and node.data.get('read_only') is not None:
1730 self.read_only = node.data['read_only']
1731 if (self.relative_cwd is None and
1732 node.data.get('relative_cwd') is not None):
1733 self.relative_cwd = node.data['relative_cwd']
1734
1735
Vadim Shtayura8623c272014-12-01 11:45:27 -08001736def set_storage_api_class(cls):
1737 """Replaces StorageApi implementation used by default."""
1738 global _storage_api_cls
1739 assert _storage_api_cls is None
1740 assert issubclass(cls, StorageApi)
1741 _storage_api_cls = cls
1742
1743
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001744def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001745 """Returns an object that implements low-level StorageApi interface.
1746
1747 It is used by Storage to work with single isolate |namespace|. It should
1748 rarely be used directly by clients, see 'get_storage' for
1749 a better alternative.
1750
1751 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001752 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001753 namespace: isolate namespace to operate in, also defines hashing and
1754 compression scheme used, i.e. namespace names that end with '-gzip'
1755 store compressed data.
1756
1757 Returns:
1758 Instance of StorageApi subclass.
1759 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001760 cls = _storage_api_cls or IsolateServer
1761 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001762
1763
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001764def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001765 """Returns Storage class that can upload and download from |namespace|.
1766
1767 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001768 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001769 namespace: isolate namespace to operate in, also defines hashing and
1770 compression scheme used, i.e. namespace names that end with '-gzip'
1771 store compressed data.
1772
1773 Returns:
1774 Instance of Storage.
1775 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001776 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001777
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001778
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001779def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001780 """Uploads the given tree to the given url.
1781
1782 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001783 base_url: The url of the isolate server to upload to.
1784 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001785 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001786 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001787 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001788 # Filter out symlinks, since they are not represented by items on isolate
1789 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001790 items = []
1791 seen = set()
1792 skipped = 0
1793 for filepath, metadata in infiles:
1794 if 'l' not in metadata and filepath not in seen:
1795 seen.add(filepath)
1796 item = FileItem(
1797 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001798 digest=metadata['h'],
1799 size=metadata['s'],
1800 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001801 items.append(item)
1802 else:
1803 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001804
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001805 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001806 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001807 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001808
1809
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001810def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001811 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001812
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001813 Arguments:
1814 isolated_hash: hash of the root *.isolated file.
1815 storage: Storage class that communicates with isolate storage.
1816 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001817 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001818 require_command: Ensure *.isolated specifies a command to run.
1819
1820 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001821 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001822 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001823 logging.debug(
1824 'fetch_isolated(%s, %s, %s, %s, %s)',
1825 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001826 # Hash algorithm to use, defined by namespace |storage| is using.
1827 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001828 with cache:
1829 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001830 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001831
1832 with tools.Profiler('GetIsolateds'):
1833 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001834 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001835 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1836 try:
1837 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1838 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001839 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001840 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1841 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001842
1843 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001844 bundle.fetch(fetch_queue, isolated_hash, algo)
1845 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001846 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1847 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001848 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001849
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001850 with tools.Profiler('GetRest'):
1851 # Create file system hierarchy.
1852 if not os.path.isdir(outdir):
1853 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001854 create_directories(outdir, bundle.files)
1855 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001856
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001857 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001858 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001859 if not os.path.isdir(cwd):
1860 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001861
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001862 # Multimap: digest -> list of pairs (path, props).
1863 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001864 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001865 if 'h' in props:
1866 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001867
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001868 # Now block on the remaining files to be downloaded and mapped.
1869 logging.info('Retrieving remaining files (%d of them)...',
1870 fetch_queue.pending_count)
1871 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001872 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001873 while remaining:
1874 detector.ping()
1875
1876 # Wait for any item to finish fetching to cache.
1877 digest = fetch_queue.wait(remaining)
1878
1879 # Link corresponding files to a fetched item in cache.
1880 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001881 cache.hardlink(
1882 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001883
1884 # Report progress.
1885 duration = time.time() - last_update
1886 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1887 msg = '%d files remaining...' % len(remaining)
1888 print msg
1889 logging.info(msg)
1890 last_update = time.time()
1891
1892 # Cache could evict some items we just tried to fetch, it's a fatal error.
1893 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001894 raise isolated_format.MappingError(
1895 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001896 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001897
1898
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001899def directory_to_metadata(root, algo, blacklist):
1900 """Returns the FileItem list and .isolated metadata for a directory."""
1901 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001902 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001903 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001904 metadata = {
1905 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001906 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001907 for relpath in paths
1908 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001909 for v in metadata.itervalues():
1910 v.pop('t')
1911 items = [
1912 FileItem(
1913 path=os.path.join(root, relpath),
1914 digest=meta['h'],
1915 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001916 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001917 for relpath, meta in metadata.iteritems() if 'h' in meta
1918 ]
1919 return items, metadata
1920
1921
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001922def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001923 """Stores every entries and returns the relevant data.
1924
1925 Arguments:
1926 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001927 files: list of file paths to upload. If a directory is specified, a
1928 .isolated file is created and its hash is returned.
1929 blacklist: function that returns True if a file should be omitted.
1930 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001931 assert all(isinstance(i, unicode) for i in files), files
1932 if len(files) != len(set(map(os.path.abspath, files))):
1933 raise Error('Duplicate entries found.')
1934
1935 results = []
1936 # The temporary directory is only created as needed.
1937 tempdir = None
1938 try:
1939 # TODO(maruel): Yield the files to a worker thread.
1940 items_to_upload = []
1941 for f in files:
1942 try:
1943 filepath = os.path.abspath(f)
1944 if os.path.isdir(filepath):
1945 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001946 items, metadata = directory_to_metadata(
1947 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001948
1949 # Create the .isolated file.
1950 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001951 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1952 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001953 os.close(handle)
1954 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001955 'algo':
1956 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001957 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001958 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001959 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001960 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001961 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001962 items_to_upload.extend(items)
1963 items_to_upload.append(
1964 FileItem(
1965 path=isolated,
1966 digest=h,
1967 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001968 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001969 results.append((h, f))
1970
1971 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001972 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001973 items_to_upload.append(
1974 FileItem(
1975 path=filepath,
1976 digest=h,
1977 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001978 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001979 results.append((h, f))
1980 else:
1981 raise Error('%s is neither a file or directory.' % f)
1982 except OSError:
1983 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001984 # Technically we would care about which files were uploaded but we don't
1985 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001986 _uploaded_files = storage.upload_items(items_to_upload)
1987 return results
1988 finally:
Marc-Antoine Ruel1b7bfec2015-02-11 15:35:42 -05001989 if tempdir and os.path.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001990 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001991
1992
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001993def archive(out, namespace, files, blacklist):
1994 if files == ['-']:
1995 files = sys.stdin.readlines()
1996
1997 if not files:
1998 raise Error('Nothing to upload')
1999
2000 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002001 blacklist = tools.gen_blacklist(blacklist)
2002 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002003 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002004 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2005
2006
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002007@subcommand.usage('<file1..fileN> or - to read from stdin')
2008def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002009 """Archives data to the server.
2010
2011 If a directory is specified, a .isolated file is created the whole directory
2012 is uploaded. Then this .isolated file can be included in another one to run
2013 commands.
2014
2015 The commands output each file that was processed with its content hash. For
2016 directories, the .isolated generated for the directory is listed as the
2017 directory entry itself.
2018 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002019 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002020 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002021 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002022 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002023 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002024 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002025 except Error as e:
2026 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002027 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002028
2029
2030def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002031 """Download data from the server.
2032
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002033 It can either download individual files or a complete tree from a .isolated
2034 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002035 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002036 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002037 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002038 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002039 help='hash of an isolated file, .isolated file content is discarded, use '
2040 '--file if you need it')
2041 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002042 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2043 help='hash and destination of a file, can be used multiple times')
2044 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002045 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002046 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002047 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002048 options, args = parser.parse_args(args)
2049 if args:
2050 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002051
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002052 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002053 if bool(options.isolated) == bool(options.file):
2054 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002055
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002056 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002057 options.target = os.path.abspath(options.target)
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002058 if options.isolated:
2059 if (os.path.isfile(options.target) or
2060 (os.path.isdir(options.target) and os.listdir(options.target))):
2061 parser.error(
2062 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002063 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002064 # Fetching individual files.
2065 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002066 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002067 channel = threading_utils.TaskChannel()
2068 pending = {}
2069 for digest, dest in options.file:
2070 pending[digest] = dest
2071 storage.async_fetch(
2072 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002073 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002074 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002075 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002076 functools.partial(file_write, os.path.join(options.target, dest)))
2077 while pending:
2078 fetched = channel.pull()
2079 dest = pending.pop(fetched)
2080 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002081
Vadim Shtayura3172be52013-12-03 12:49:05 -08002082 # Fetching whole isolated tree.
2083 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002084 with cache:
2085 bundle = fetch_isolated(
2086 isolated_hash=options.isolated,
2087 storage=storage,
2088 cache=cache,
2089 outdir=options.target,
2090 require_command=False)
2091 if bundle.command:
2092 rel = os.path.join(options.target, bundle.relative_cwd)
2093 print('To run this test please run from the directory %s:' %
2094 os.path.join(options.target, rel))
2095 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002096
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002097 return 0
2098
2099
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002100def add_archive_options(parser):
2101 parser.add_option(
2102 '--blacklist',
2103 action='append', default=list(DEFAULT_BLACKLIST),
2104 help='List of regexp to use as blacklist filter when uploading '
2105 'directories')
2106
2107
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002108def add_isolate_server_options(parser):
2109 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002110 parser.add_option(
2111 '-I', '--isolate-server',
2112 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002113 help='URL of the Isolate Server to use. Defaults to the environment '
2114 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2115 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002116 parser.add_option(
2117 '--namespace', default='default-gzip',
2118 help='The namespace to use on the Isolate Server, default: %default')
2119
2120
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002121def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002122 """Processes the --isolate-server option and aborts if not specified.
2123
2124 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002125 """
2126 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002127 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002128 try:
2129 options.isolate_server = net.fix_url(options.isolate_server)
2130 except ValueError as e:
2131 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002132 if set_exception_handler:
2133 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002134 try:
2135 return auth.ensure_logged_in(options.isolate_server)
2136 except ValueError as e:
2137 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002138
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002139
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002140def add_cache_options(parser):
2141 cache_group = optparse.OptionGroup(parser, 'Cache management')
2142 cache_group.add_option(
2143 '--cache', metavar='DIR',
2144 help='Directory to keep a local cache of the files. Accelerates download '
2145 'by reusing already downloaded files. Default=%default')
2146 cache_group.add_option(
2147 '--max-cache-size',
2148 type='int',
2149 metavar='NNN',
2150 default=20*1024*1024*1024,
2151 help='Trim if the cache gets larger than this value, default=%default')
2152 cache_group.add_option(
2153 '--min-free-space',
2154 type='int',
2155 metavar='NNN',
2156 default=2*1024*1024*1024,
2157 help='Trim if disk free space becomes lower than this value, '
2158 'default=%default')
2159 cache_group.add_option(
2160 '--max-items',
2161 type='int',
2162 metavar='NNN',
2163 default=100000,
2164 help='Trim if more than this number of items are in the cache '
2165 'default=%default')
2166 parser.add_option_group(cache_group)
2167
2168
2169def process_cache_options(options):
2170 if options.cache:
2171 policies = CachePolicies(
2172 options.max_cache_size, options.min_free_space, options.max_items)
2173
2174 # |options.cache| path may not exist until DiskCache() instance is created.
2175 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002176 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002177 policies,
2178 isolated_format.get_hash_algo(options.namespace))
2179 else:
2180 return MemoryCache()
2181
2182
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002183class OptionParserIsolateServer(tools.OptionParserWithLogging):
2184 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002185 tools.OptionParserWithLogging.__init__(
2186 self,
2187 version=__version__,
2188 prog=os.path.basename(sys.modules[__name__].__file__),
2189 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002190 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002191
2192 def parse_args(self, *args, **kwargs):
2193 options, args = tools.OptionParserWithLogging.parse_args(
2194 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002195 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002196 return options, args
2197
2198
2199def main(args):
2200 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002201 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002202
2203
2204if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002205 fix_encoding.fix_encoding()
2206 tools.disable_buffering()
2207 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002208 sys.exit(main(sys.argv[1:]))