blob: 0fda8445bf01b55e3ec72108dcf7be4f2c8c1233 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040012import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040015import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000020import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050021import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040029from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000030from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040031from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000032from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000033from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000034
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080035import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040036import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000039# Version of isolate protocol passed to the server in /handshake request.
40ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000041
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000042
Vadim Shtayura3148e072014-09-02 18:51:52 -070043# The file size to be used when we don't know the correct file size,
44# generally used for .isolated files.
45UNKNOWN_FILE_SIZE = None
46
47
48# Maximum expected delay (in seconds) between successive file fetches or uploads
49# in Storage. If it takes longer than that, a deadlock might be happening
50# and all stack frames for all threads are dumped to log.
51DEADLOCK_TIMEOUT = 5 * 60
52
53
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000054# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000055# All files are sorted by likelihood of a change in the file content
56# (currently file size is used to estimate this: larger the file -> larger the
57# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# and so on. Numbers here is a trade-off; the more per request, the lower the
60# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
61# larger values cause longer lookups, increasing the initial latency to start
62# uploading, which is especially an issue for large files. This value is
63# optimized for the "few thousands files to look up with minimal number of large
64# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040065ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000066
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000067
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000068# A list of already compressed extension types that should not receive any
69# compression before being uploaded.
70ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040071 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
72 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000073]
74
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000075
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000076# Chunk size to use when reading from network stream.
77NET_IO_FILE_CHUNK = 16 * 1024
78
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000079
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000080# Read timeout in seconds for downloads from isolate storage. If there's no
81# response from the server within this timeout whole download will be aborted.
82DOWNLOAD_READ_TIMEOUT = 60
83
84
maruel@chromium.org41601642013-09-18 19:40:46 +000085# The delay (in seconds) to wait between logging statements when retrieving
86# the required files. This is intended to let the user (or buildbot) know that
87# the program is still running.
88DELAY_BETWEEN_UPDATES_IN_SECS = 30
89
90
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050091DEFAULT_BLACKLIST = (
92 # Temporary vim or python files.
93 r'^.+\.(?:pyc|swp)$',
94 # .git or .svn directory.
95 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
96)
97
98
99# Chromium-specific.
100DEFAULT_BLACKLIST += (
101 r'^.+\.(?:run_test_cases)$',
102 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
103)
104
105
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500106class Error(Exception):
107 """Generic runtime error."""
108 pass
109
110
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400111class Aborted(Error):
112 """Operation aborted."""
113 pass
114
115
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000116def stream_read(stream, chunk_size):
117 """Reads chunks from |stream| and yields them."""
118 while True:
119 data = stream.read(chunk_size)
120 if not data:
121 break
122 yield data
123
124
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400125def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000127 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 if offset:
129 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000131 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 if not data:
133 break
134 yield data
135
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137def file_write(filepath, content_generator):
138 """Writes file content as generated by content_generator.
139
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 Creates the intermediary directory as needed.
141
142 Returns the number of bytes written.
143
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 Meant to be mocked out in unit tests.
145 """
146 filedir = os.path.dirname(filepath)
147 if not os.path.isdir(filedir):
148 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000149 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000150 with open(filepath, 'wb') as f:
151 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000155
156
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000157def zip_compress(content_generator, level=7):
158 """Reads chunks from |content_generator| and yields zip compressed chunks."""
159 compressor = zlib.compressobj(level)
160 for chunk in content_generator:
161 compressed = compressor.compress(chunk)
162 if compressed:
163 yield compressed
164 tail = compressor.flush(zlib.Z_FINISH)
165 if tail:
166 yield tail
167
168
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400169def zip_decompress(
170 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000171 """Reads zipped data from |content_generator| and yields decompressed data.
172
173 Decompresses data in small chunks (no larger than |chunk_size|) so that
174 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
175
176 Raises IOError if data is corrupted or incomplete.
177 """
178 decompressor = zlib.decompressobj()
179 compressed_size = 0
180 try:
181 for chunk in content_generator:
182 compressed_size += len(chunk)
183 data = decompressor.decompress(chunk, chunk_size)
184 if data:
185 yield data
186 while decompressor.unconsumed_tail:
187 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
188 if data:
189 yield data
190 tail = decompressor.flush()
191 if tail:
192 yield tail
193 except zlib.error as e:
194 raise IOError(
195 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
196 # Ensure all data was read and decompressed.
197 if decompressor.unused_data or decompressor.unconsumed_tail:
198 raise IOError('Not all data was decompressed')
199
200
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000201def get_zip_compression_level(filename):
202 """Given a filename calculates the ideal zip compression level to use."""
203 file_ext = os.path.splitext(filename)[1].lower()
204 # TODO(csharp): Profile to find what compression level works best.
205 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
206
207
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000208def create_directories(base_directory, files):
209 """Creates the directory structure needed by the given list of files."""
210 logging.debug('create_directories(%s, %d)', base_directory, len(files))
211 # Creates the tree of directories to create.
212 directories = set(os.path.dirname(f) for f in files)
213 for item in list(directories):
214 while item:
215 directories.add(item)
216 item = os.path.dirname(item)
217 for d in sorted(directories):
218 if d:
219 os.mkdir(os.path.join(base_directory, d))
220
221
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500222def create_symlinks(base_directory, files):
223 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000224 for filepath, properties in files:
225 if 'l' not in properties:
226 continue
227 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500228 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000229 logging.warning('Ignoring symlink %s', filepath)
230 continue
231 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500232 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000234
235
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000236def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000237 """Determines if the given files appears valid.
238
239 Currently it just checks the file's size.
240 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700241 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000242 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000243 actual_size = os.stat(filepath).st_size
244 if size != actual_size:
245 logging.warning(
246 'Found invalid item %s; %d != %d',
247 os.path.basename(filepath), actual_size, size)
248 return False
249 return True
250
251
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000252class Item(object):
253 """An item to push to Storage.
254
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800255 Its digest and size may be provided in advance, if known. Otherwise they will
256 be derived from content(). If digest is provided, it MUST correspond to
257 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000258
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800259 When used with Storage, Item starts its life in a main thread, travels
260 to 'contains' thread, then to 'push' thread and then finally back to
261 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000262 """
263
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800264 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000265 self.digest = digest
266 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800267 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000269
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800270 def content(self):
271 """Iterable with content of this item as byte string (str) chunks."""
272 raise NotImplementedError()
273
274 def prepare(self, hash_algo):
275 """Ensures self.digest and self.size are set.
276
277 Uses content() as a source of data to calculate them. Does nothing if digest
278 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000279
280 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800281 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000282 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800283 if self.digest is None or self.size is None:
284 digest = hash_algo()
285 total = 0
286 for chunk in self.content():
287 digest.update(chunk)
288 total += len(chunk)
289 self.digest = digest.hexdigest()
290 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000291
292
293class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800294 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000295
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800296 Its digest and size may be provided in advance, if known. Otherwise they will
297 be derived from the file content.
298 """
299
300 def __init__(self, path, digest=None, size=None, high_priority=False):
301 super(FileItem, self).__init__(
302 digest,
303 size if size is not None else os.stat(path).st_size,
304 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000305 self.path = path
306 self.compression_level = get_zip_compression_level(path)
307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 def content(self):
309 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000310
311
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000312class BufferItem(Item):
313 """A byte buffer to push to Storage."""
314
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800315 def __init__(self, buf, high_priority=False):
316 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000317 self.buffer = buf
318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000320 return [self.buffer]
321
322
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000323class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000325
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 Implements compression support, parallel 'contains' checks, parallel uploads
327 and more.
328
329 Works only within single namespace (and thus hashing algorithm and compression
330 scheme are fixed).
331
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400332 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
333 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800334 """
335
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700336 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000337 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400338 self._use_zip = isolated_format.is_namespace_with_compression(
339 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400340 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000341 self._cpu_thread_pool = None
342 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400343 self._aborted = False
344 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000345
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000346 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700347 def hash_algo(self):
348 """Hashing algorithm used to name files in storage based on their content.
349
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400350 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700351 """
352 return self._hash_algo
353
354 @property
355 def location(self):
356 """Location of a backing store that this class is using.
357
358 Exact meaning depends on the storage_api type. For IsolateServer it is
359 an URL of isolate server, for FileSystem is it a path in file system.
360 """
361 return self._storage_api.location
362
363 @property
364 def namespace(self):
365 """Isolate namespace used by this storage.
366
367 Indirectly defines hashing scheme and compression method used.
368 """
369 return self._storage_api.namespace
370
371 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000372 def cpu_thread_pool(self):
373 """ThreadPool for CPU-bound tasks like zipping."""
374 if self._cpu_thread_pool is None:
375 self._cpu_thread_pool = threading_utils.ThreadPool(
376 2, max(threading_utils.num_processors(), 2), 0, 'zip')
377 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000378
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000379 @property
380 def net_thread_pool(self):
381 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
382 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700383 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000385
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000386 def close(self):
387 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400388 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000389 if self._cpu_thread_pool:
390 self._cpu_thread_pool.join()
391 self._cpu_thread_pool.close()
392 self._cpu_thread_pool = None
393 if self._net_thread_pool:
394 self._net_thread_pool.join()
395 self._net_thread_pool.close()
396 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400397 logging.info('Done.')
398
399 def abort(self):
400 """Cancels any pending or future operations."""
401 # This is not strictly theadsafe, but in the worst case the logging message
402 # will be printed twice. Not a big deal. In other places it is assumed that
403 # unprotected reads and writes to _aborted are serializable (it is true
404 # for python) and thus no locking is used.
405 if not self._aborted:
406 logging.warning('Aborting... It can take a while.')
407 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000408
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000409 def __enter__(self):
410 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400411 assert not self._prev_sig_handlers, self._prev_sig_handlers
412 for s in (signal.SIGINT, signal.SIGTERM):
413 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000414 return self
415
416 def __exit__(self, _exc_type, _exc_value, _traceback):
417 """Context manager interface."""
418 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400419 while self._prev_sig_handlers:
420 s, h = self._prev_sig_handlers.popitem()
421 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000422 return False
423
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000424 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800425 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000426
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800427 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000428
429 Arguments:
430 items: list of Item instances that represents data to upload.
431
432 Returns:
433 List of items that were uploaded. All other items are already there.
434 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700435 logging.info('upload_items(items=%d)', len(items))
436
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800437 # Ensure all digests are calculated.
438 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700439 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800440
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000441 # For each digest keep only first Item that matches it. All other items
442 # are just indistinguishable copies from the point of view of isolate
443 # server (it doesn't care about paths at all, only content and digests).
444 seen = {}
445 duplicates = 0
446 for item in items:
447 if seen.setdefault(item.digest, item) is not item:
448 duplicates += 1
449 items = seen.values()
450 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700451 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000452
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000454 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000455 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800456 channel = threading_utils.TaskChannel()
457 for missing_item, push_state in self.get_missing_items(items):
458 missing.add(missing_item)
459 self.async_push(channel, missing_item, push_state)
460
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 # No need to spawn deadlock detector thread if there's nothing to upload.
462 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700463 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000465 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000466 detector.ping()
467 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000469 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000470 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000471 logging.info('All files are uploaded')
472
473 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000474 total = len(items)
475 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000476 logging.info(
477 'Total: %6d, %9.1fkb',
478 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000479 total_size / 1024.)
480 cache_hit = set(items) - missing
481 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 logging.info(
483 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
484 len(cache_hit),
485 cache_hit_size / 1024.,
486 len(cache_hit) * 100. / total,
487 cache_hit_size * 100. / total_size if total_size else 0)
488 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000489 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000490 logging.info(
491 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
492 len(cache_miss),
493 cache_miss_size / 1024.,
494 len(cache_miss) * 100. / total,
495 cache_miss_size * 100. / total_size if total_size else 0)
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 return uploaded
498
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800499 def get_fetch_url(self, item):
500 """Returns an URL that can be used to fetch given item once it's uploaded.
501
502 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000503
504 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800505 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000506
507 Returns:
508 An URL or None if underlying protocol doesn't support this.
509 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700510 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000512
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000514 """Starts asynchronous push to the server in a parallel thread.
515
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 Can be used only after |item| was checked for presence on a server with
517 'get_missing_items' call. 'get_missing_items' returns |push_state| object
518 that contains storage specific information describing how to upload
519 the item (for example in case of cloud storage, it is signed upload URLs).
520
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000521 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000522 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000523 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800524 push_state: push state returned by 'get_missing_items' call for |item|.
525
526 Returns:
527 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400530 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700531 threading_utils.PRIORITY_HIGH if item.high_priority
532 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800533
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000534 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400535 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400536 if self._aborted:
537 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700538 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 return item
541
542 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700543 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800544 self.net_thread_pool.add_task_with_channel(
545 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000546 return
547
548 # If zipping is enabled, zip in a separate thread.
549 def zip_and_push():
550 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
551 # content right here. It will block until all file is zipped.
552 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400553 if self._aborted:
554 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800555 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000556 data = ''.join(stream)
557 except Exception as exc:
558 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800559 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000560 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000561 self.net_thread_pool.add_task_with_channel(
562 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000563 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000564
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800565 def push(self, item, push_state):
566 """Synchronously pushes a single item to the server.
567
568 If you need to push many items at once, consider using 'upload_items' or
569 'async_push' with instance of TaskChannel.
570
571 Arguments:
572 item: item to upload as instance of Item class.
573 push_state: push state returned by 'get_missing_items' call for |item|.
574
575 Returns:
576 Pushed item (same object as |item|).
577 """
578 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700579 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800580 self.async_push(channel, item, push_state)
581 pushed = channel.pull()
582 assert pushed is item
583 return item
584
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000585 def async_fetch(self, channel, priority, digest, size, sink):
586 """Starts asynchronous fetch from the server in a parallel thread.
587
588 Arguments:
589 channel: TaskChannel that receives back |digest| when download ends.
590 priority: thread pool task priority for the fetch.
591 digest: hex digest of an item to download.
592 size: expected size of the item (after decompression).
593 sink: function that will be called as sink(generator).
594 """
595 def fetch():
596 try:
597 # Prepare reading pipeline.
598 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700599 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400600 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000601 # Run |stream| through verifier that will assert its size.
602 verifier = FetchStreamVerifier(stream, size)
603 # Verified stream goes to |sink|.
604 sink(verifier.run())
605 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800606 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000607 raise
608 return digest
609
610 # Don't bother with zip_thread_pool for decompression. Decompression is
611 # really fast and most probably IO bound anyway.
612 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
613
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000614 def get_missing_items(self, items):
615 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000617 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000618
619 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000620 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000621
622 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800623 For each missing item it yields a pair (item, push_state), where:
624 * item - Item object that is missing (one of |items|).
625 * push_state - opaque object that contains storage specific information
626 describing how to upload the item (for example in case of cloud
627 storage, it is signed upload URLs). It can later be passed to
628 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000629 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000630 channel = threading_utils.TaskChannel()
631 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800632
633 # Ensure all digests are calculated.
634 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700635 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400637 def contains(batch):
638 if self._aborted:
639 raise Aborted()
640 return self._storage_api.contains(batch)
641
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000642 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800643 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400644 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400645 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000646 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800647
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000648 # Yield results as they come in.
649 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800650 for missing_item, push_state in channel.pull().iteritems():
651 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000653
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800654def batch_items_for_check(items):
655 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000656
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800657 Each batch corresponds to a single 'exists?' query to the server via a call
658 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000659
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800660 Arguments:
661 items: a list of Item objects.
662
663 Yields:
664 Batches of items to query for existence in a single operation,
665 each batch is a list of Item objects.
666 """
667 batch_count = 0
668 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
669 next_queries = []
670 for item in sorted(items, key=lambda x: x.size, reverse=True):
671 next_queries.append(item)
672 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000673 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800674 next_queries = []
675 batch_count += 1
676 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
677 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
678 if next_queries:
679 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000682class FetchQueue(object):
683 """Fetches items from Storage and places them into LocalCache.
684
685 It manages multiple concurrent fetch operations. Acts as a bridge between
686 Storage and LocalCache so that Storage and LocalCache don't depend on each
687 other at all.
688 """
689
690 def __init__(self, storage, cache):
691 self.storage = storage
692 self.cache = cache
693 self._channel = threading_utils.TaskChannel()
694 self._pending = set()
695 self._accessed = set()
696 self._fetched = cache.cached_set()
697
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400698 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700699 self,
700 digest,
701 size=UNKNOWN_FILE_SIZE,
702 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000703 """Starts asynchronous fetch of item |digest|."""
704 # Fetching it now?
705 if digest in self._pending:
706 return
707
708 # Mark this file as in use, verify_all_cached will later ensure it is still
709 # in cache.
710 self._accessed.add(digest)
711
712 # Already fetched? Notify cache to update item's LRU position.
713 if digest in self._fetched:
714 # 'touch' returns True if item is in cache and not corrupted.
715 if self.cache.touch(digest, size):
716 return
717 # Item is corrupted, remove it from cache and fetch it again.
718 self._fetched.remove(digest)
719 self.cache.evict(digest)
720
721 # TODO(maruel): It should look at the free disk space, the current cache
722 # size and the size of the new item on every new item:
723 # - Trim the cache as more entries are listed when free disk space is low,
724 # otherwise if the amount of data downloaded during the run > free disk
725 # space, it'll crash.
726 # - Make sure there's enough free disk space to fit all dependencies of
727 # this run! If not, abort early.
728
729 # Start fetching.
730 self._pending.add(digest)
731 self.storage.async_fetch(
732 self._channel, priority, digest, size,
733 functools.partial(self.cache.write, digest))
734
735 def wait(self, digests):
736 """Starts a loop that waits for at least one of |digests| to be retrieved.
737
738 Returns the first digest retrieved.
739 """
740 # Flush any already fetched items.
741 for digest in digests:
742 if digest in self._fetched:
743 return digest
744
745 # Ensure all requested items are being fetched now.
746 assert all(digest in self._pending for digest in digests), (
747 digests, self._pending)
748
749 # Wait for some requested item to finish fetching.
750 while self._pending:
751 digest = self._channel.pull()
752 self._pending.remove(digest)
753 self._fetched.add(digest)
754 if digest in digests:
755 return digest
756
757 # Should never reach this point due to assert above.
758 raise RuntimeError('Impossible state')
759
760 def inject_local_file(self, path, algo):
761 """Adds local file to the cache as if it was fetched from storage."""
762 with open(path, 'rb') as f:
763 data = f.read()
764 digest = algo(data).hexdigest()
765 self.cache.write(digest, [data])
766 self._fetched.add(digest)
767 return digest
768
769 @property
770 def pending_count(self):
771 """Returns number of items to be fetched."""
772 return len(self._pending)
773
774 def verify_all_cached(self):
775 """True if all accessed items are in cache."""
776 return self._accessed.issubset(self.cache.cached_set())
777
778
779class FetchStreamVerifier(object):
780 """Verifies that fetched file is valid before passing it to the LocalCache."""
781
782 def __init__(self, stream, expected_size):
783 self.stream = stream
784 self.expected_size = expected_size
785 self.current_size = 0
786
787 def run(self):
788 """Generator that yields same items as |stream|.
789
790 Verifies |stream| is complete before yielding a last chunk to consumer.
791
792 Also wraps IOError produced by consumer into MappingError exceptions since
793 otherwise Storage will retry fetch on unrelated local cache errors.
794 """
795 # Read one chunk ahead, keep it in |stored|.
796 # That way a complete stream can be verified before pushing last chunk
797 # to consumer.
798 stored = None
799 for chunk in self.stream:
800 assert chunk is not None
801 if stored is not None:
802 self._inspect_chunk(stored, is_last=False)
803 try:
804 yield stored
805 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400806 raise isolated_format.MappingError(
807 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000808 stored = chunk
809 if stored is not None:
810 self._inspect_chunk(stored, is_last=True)
811 try:
812 yield stored
813 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400814 raise isolated_format.MappingError(
815 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000816
817 def _inspect_chunk(self, chunk, is_last):
818 """Called for each fetched chunk before passing it to consumer."""
819 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400820 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700821 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 (self.expected_size != self.current_size)):
823 raise IOError('Incorrect file size: expected %d, got %d' % (
824 self.expected_size, self.current_size))
825
826
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000827class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800828 """Interface for classes that implement low-level storage operations.
829
830 StorageApi is oblivious of compression and hashing scheme used. This details
831 are handled in higher level Storage class.
832
833 Clients should generally not use StorageApi directly. Storage class is
834 preferred since it implements compression and upload optimizations.
835 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000836
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700837 @property
838 def location(self):
839 """Location of a backing store that this class is using.
840
841 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
842 server, for FileSystem is it a path in file system.
843 """
844 raise NotImplementedError()
845
846 @property
847 def namespace(self):
848 """Isolate namespace used by this storage.
849
850 Indirectly defines hashing scheme and compression method used.
851 """
852 raise NotImplementedError()
853
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000854 def get_fetch_url(self, digest):
855 """Returns an URL that can be used to fetch an item with given digest.
856
857 Arguments:
858 digest: hex digest of item to fetch.
859
860 Returns:
861 An URL or None if the protocol doesn't support this.
862 """
863 raise NotImplementedError()
864
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800865 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000866 """Fetches an object and yields its content.
867
868 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000869 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800870 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000871
872 Yields:
873 Chunks of downloaded item (as str objects).
874 """
875 raise NotImplementedError()
876
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800877 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000878 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000879
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800880 |item| MUST go through 'contains' call to get |push_state| before it can
881 be pushed to the storage.
882
883 To be clear, here is one possible usage:
884 all_items = [... all items to push as Item subclasses ...]
885 for missing_item, push_state in storage_api.contains(all_items).items():
886 storage_api.push(missing_item, push_state)
887
888 When pushing to a namespace with compression, data that should be pushed
889 and data provided by the item is not the same. In that case |content| is
890 not None and it yields chunks of compressed data (using item.content() as
891 a source of original uncompressed data). This is implemented by Storage
892 class.
893
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000894 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000895 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800896 push_state: push state object as returned by 'contains' call.
897 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000898
899 Returns:
900 None.
901 """
902 raise NotImplementedError()
903
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000904 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800905 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000906
907 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800908 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000909
910 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800911 A dict missing Item -> opaque push state object to be passed to 'push'.
912 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000913 """
914 raise NotImplementedError()
915
916
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800917class _IsolateServerPushState(object):
918 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500919
920 Note this needs to be a global class to support pickling.
921 """
922
923 def __init__(self, upload_url, finalize_url):
924 self.upload_url = upload_url
925 self.finalize_url = finalize_url
926 self.uploaded = False
927 self.finalized = False
928
929
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000930class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000931 """StorageApi implementation that downloads and uploads to Isolate Server.
932
933 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800934 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000935 """
936
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000937 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000938 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000939 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700940 self._base_url = base_url.rstrip('/')
941 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000942 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000943 self._server_caps = None
944
945 @staticmethod
946 def _generate_handshake_request():
947 """Returns a dict to be sent as handshake request body."""
948 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
949 return {
950 'client_app_version': __version__,
951 'fetcher': True,
952 'protocol_version': ISOLATE_PROTOCOL_VERSION,
953 'pusher': True,
954 }
955
956 @staticmethod
957 def _validate_handshake_response(caps):
958 """Validates and normalizes handshake response."""
959 logging.info('Protocol version: %s', caps['protocol_version'])
960 logging.info('Server version: %s', caps['server_app_version'])
961 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400962 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000963 if not caps['access_token']:
964 raise ValueError('access_token is missing')
965 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000966
967 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000968 def _server_capabilities(self):
969 """Performs handshake with the server if not yet done.
970
971 Returns:
972 Server capabilities dictionary as returned by /handshake endpoint.
973
974 Raises:
975 MappingError if server rejects the handshake.
976 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000977 # TODO(maruel): Make this request much earlier asynchronously while the
978 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800979
980 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
981 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000982 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000983 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000984 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400985 caps = net.url_read_json(
986 url=self._base_url + '/content-gs/handshake',
987 data=self._generate_handshake_request())
988 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400989 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000990 if not isinstance(caps, dict):
991 raise ValueError('Expecting JSON dict')
992 self._server_caps = self._validate_handshake_response(caps)
993 except (ValueError, KeyError, TypeError) as exc:
994 # KeyError exception has very confusing str conversion: it's just a
995 # missing key value and nothing else. So print exception class name
996 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400997 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400998 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000999 exc.__class__.__name__, exc))
1000 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001001
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001002 @property
1003 def location(self):
1004 return self._base_url
1005
1006 @property
1007 def namespace(self):
1008 return self._namespace
1009
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001010 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001011 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001012 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001013 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001014
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001015 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001016 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001017 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001018
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001019 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001020 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001021 read_timeout=DOWNLOAD_READ_TIMEOUT,
1022 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1023
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001024 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001025 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001026
1027 # If |offset| is used, verify server respects it by checking Content-Range.
1028 if offset:
1029 content_range = connection.get_header('Content-Range')
1030 if not content_range:
1031 raise IOError('Missing Content-Range header')
1032
1033 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1034 # According to a spec, <size> can be '*' meaning "Total size of the file
1035 # is not known in advance".
1036 try:
1037 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1038 if not match:
1039 raise ValueError()
1040 content_offset = int(match.group(1))
1041 last_byte_index = int(match.group(2))
1042 size = None if match.group(3) == '*' else int(match.group(3))
1043 except ValueError:
1044 raise IOError('Invalid Content-Range header: %s' % content_range)
1045
1046 # Ensure returned offset equals requested one.
1047 if offset != content_offset:
1048 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1049 offset, content_offset, content_range))
1050
1051 # Ensure entire tail of the file is returned.
1052 if size is not None and last_byte_index + 1 != size:
1053 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1054
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001055 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001056
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001057 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001058 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001059 assert item.digest is not None
1060 assert item.size is not None
1061 assert isinstance(push_state, _IsolateServerPushState)
1062 assert not push_state.finalized
1063
1064 # Default to item.content().
1065 content = item.content() if content is None else content
1066
1067 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1068 if isinstance(content, basestring):
1069 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1070 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001071
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001072 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1073 # If |content| is indeed a generator, it can not be re-winded back
1074 # to the beginning of the stream. A retry will find it exhausted. A possible
1075 # solution is to wrap |content| generator with some sort of caching
1076 # restartable generator. It should be done alongside streaming support
1077 # implementation.
1078
1079 # This push operation may be a retry after failed finalization call below,
1080 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001081 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001082 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1083 # upload support is implemented.
1084 if isinstance(content, list) and len(content) == 1:
1085 content = content[0]
1086 else:
1087 content = ''.join(content)
1088 # PUT file to |upload_url|.
1089 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001090 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001091 data=content,
1092 content_type='application/octet-stream',
1093 method='PUT')
1094 if response is None:
1095 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001096 item.digest, push_state.upload_url))
1097 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001098 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001099 logging.info(
1100 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001101
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001102 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001103 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001104 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1105 # send it to isolated server. That way isolate server can verify that
1106 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1107 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001108 # TODO(maruel): Fix the server to accept propery data={} so
1109 # url_read_json() can be used.
1110 response = net.url_read(
1111 url=push_state.finalize_url,
1112 data='',
1113 content_type='application/json',
1114 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001115 if response is None:
1116 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001117 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001118
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001119 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001120 # Ensure all items were initialized with 'prepare' call. Storage does that.
1121 assert all(i.digest is not None and i.size is not None for i in items)
1122
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001123 # Request body is a json encoded list of dicts.
1124 body = [
1125 {
1126 'h': item.digest,
1127 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001128 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001130 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001131
1132 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001133 self._base_url,
1134 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001135 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001136
1137 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001138 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001139 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001140 response = net.url_read_json(url=query_url, data=body)
1141 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001142 raise isolated_format.MappingError(
1143 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001144 if not isinstance(response, list):
1145 raise ValueError('Expecting response with json-encoded list')
1146 if len(response) != len(items):
1147 raise ValueError(
1148 'Incorrect number of items in the list, expected %d, '
1149 'but got %d' % (len(items), len(response)))
1150 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001151 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001152 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001153
1154 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001155 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001156 for i, push_urls in enumerate(response):
1157 if push_urls:
1158 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001159 missing_items[items[i]] = _IsolateServerPushState(
1160 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001161 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001162 len(items), len(items) - len(missing_items))
1163 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001164
1165
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001166class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001167 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001168
1169 The common use case is a NFS/CIFS file server that is mounted locally that is
1170 used to fetch the file on a local partition.
1171 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001172
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001173 # Used for push_state instead of None. That way caller is forced to
1174 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1175 _DUMMY_PUSH_STATE = object()
1176
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001177 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001178 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001179 self._base_path = base_path
1180 self._namespace = namespace
1181
1182 @property
1183 def location(self):
1184 return self._base_path
1185
1186 @property
1187 def namespace(self):
1188 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001189
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001190 def get_fetch_url(self, digest):
1191 return None
1192
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001193 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001194 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001195 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001196
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001197 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001198 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001199 assert item.digest is not None
1200 assert item.size is not None
1201 assert push_state is self._DUMMY_PUSH_STATE
1202 content = item.content() if content is None else content
1203 if isinstance(content, basestring):
1204 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1205 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001206 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001207
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001208 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001209 assert all(i.digest is not None and i.size is not None for i in items)
1210 return dict(
1211 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001212 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001213 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001214
1215
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001216class LocalCache(object):
1217 """Local cache that stores objects fetched via Storage.
1218
1219 It can be accessed concurrently from multiple threads, so it should protect
1220 its internal state with some lock.
1221 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001222 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001223
1224 def __enter__(self):
1225 """Context manager interface."""
1226 return self
1227
1228 def __exit__(self, _exc_type, _exec_value, _traceback):
1229 """Context manager interface."""
1230 return False
1231
1232 def cached_set(self):
1233 """Returns a set of all cached digests (always a new object)."""
1234 raise NotImplementedError()
1235
1236 def touch(self, digest, size):
1237 """Ensures item is not corrupted and updates its LRU position.
1238
1239 Arguments:
1240 digest: hash digest of item to check.
1241 size: expected size of this item.
1242
1243 Returns:
1244 True if item is in cache and not corrupted.
1245 """
1246 raise NotImplementedError()
1247
1248 def evict(self, digest):
1249 """Removes item from cache if it's there."""
1250 raise NotImplementedError()
1251
1252 def read(self, digest):
1253 """Returns contents of the cached item as a single str."""
1254 raise NotImplementedError()
1255
1256 def write(self, digest, content):
1257 """Reads data from |content| generator and stores it in cache."""
1258 raise NotImplementedError()
1259
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001260 def hardlink(self, digest, dest, file_mode):
1261 """Ensures file at |dest| has same content as cached |digest|.
1262
1263 If file_mode is provided, it is used to set the executable bit if
1264 applicable.
1265 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001266 raise NotImplementedError()
1267
1268
1269class MemoryCache(LocalCache):
1270 """LocalCache implementation that stores everything in memory."""
1271
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001272 def __init__(self, file_mode_mask=0500):
1273 """Args:
1274 file_mode_mask: bit mask to AND file mode with. Default value will make
1275 all mapped files to be read only.
1276 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001277 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001278 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001279 # Let's not assume dict is thread safe.
1280 self._lock = threading.Lock()
1281 self._contents = {}
1282
1283 def cached_set(self):
1284 with self._lock:
1285 return set(self._contents)
1286
1287 def touch(self, digest, size):
1288 with self._lock:
1289 return digest in self._contents
1290
1291 def evict(self, digest):
1292 with self._lock:
1293 self._contents.pop(digest, None)
1294
1295 def read(self, digest):
1296 with self._lock:
1297 return self._contents[digest]
1298
1299 def write(self, digest, content):
1300 # Assemble whole stream before taking the lock.
1301 data = ''.join(content)
1302 with self._lock:
1303 self._contents[digest] = data
1304
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001305 def hardlink(self, digest, dest, file_mode):
1306 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001307 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001308 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001309 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001310
1311
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001312class CachePolicies(object):
1313 def __init__(self, max_cache_size, min_free_space, max_items):
1314 """
1315 Arguments:
1316 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1317 cache is effectively a leak.
1318 - min_free_space: Trim if disk free space becomes lower than this value. If
1319 0, it unconditionally fill the disk.
1320 - max_items: Maximum number of items to keep in the cache. If 0, do not
1321 enforce a limit.
1322 """
1323 self.max_cache_size = max_cache_size
1324 self.min_free_space = min_free_space
1325 self.max_items = max_items
1326
1327
1328class DiskCache(LocalCache):
1329 """Stateful LRU cache in a flat hash table in a directory.
1330
1331 Saves its state as json file.
1332 """
1333 STATE_FILE = 'state.json'
1334
1335 def __init__(self, cache_dir, policies, hash_algo):
1336 """
1337 Arguments:
1338 cache_dir: directory where to place the cache.
1339 policies: cache retention policies.
1340 algo: hashing algorithm used.
1341 """
1342 super(DiskCache, self).__init__()
1343 self.cache_dir = cache_dir
1344 self.policies = policies
1345 self.hash_algo = hash_algo
1346 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1347
1348 # All protected methods (starting with '_') except _path should be called
1349 # with this lock locked.
1350 self._lock = threading_utils.LockWithAssert()
1351 self._lru = lru.LRUDict()
1352
1353 # Profiling values.
1354 self._added = []
1355 self._removed = []
1356 self._free_disk = 0
1357
1358 with tools.Profiler('Setup'):
1359 with self._lock:
1360 self._load()
1361
1362 def __enter__(self):
1363 return self
1364
1365 def __exit__(self, _exc_type, _exec_value, _traceback):
1366 with tools.Profiler('CleanupTrimming'):
1367 with self._lock:
1368 self._trim()
1369
1370 logging.info(
1371 '%5d (%8dkb) added',
1372 len(self._added), sum(self._added) / 1024)
1373 logging.info(
1374 '%5d (%8dkb) current',
1375 len(self._lru),
1376 sum(self._lru.itervalues()) / 1024)
1377 logging.info(
1378 '%5d (%8dkb) removed',
1379 len(self._removed), sum(self._removed) / 1024)
1380 logging.info(
1381 ' %8dkb free',
1382 self._free_disk / 1024)
1383 return False
1384
1385 def cached_set(self):
1386 with self._lock:
1387 return self._lru.keys_set()
1388
1389 def touch(self, digest, size):
1390 """Verifies an actual file is valid.
1391
1392 Note that is doesn't compute the hash so it could still be corrupted if the
1393 file size didn't change.
1394
1395 TODO(maruel): More stringent verification while keeping the check fast.
1396 """
1397 # Do the check outside the lock.
1398 if not is_valid_file(self._path(digest), size):
1399 return False
1400
1401 # Update it's LRU position.
1402 with self._lock:
1403 if digest not in self._lru:
1404 return False
1405 self._lru.touch(digest)
1406 return True
1407
1408 def evict(self, digest):
1409 with self._lock:
1410 self._lru.pop(digest)
1411 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1412
1413 def read(self, digest):
1414 with open(self._path(digest), 'rb') as f:
1415 return f.read()
1416
1417 def write(self, digest, content):
1418 path = self._path(digest)
1419 # A stale broken file may remain. It is possible for the file to have write
1420 # access bit removed which would cause the file_write() call to fail to open
1421 # in write mode. Take no chance here.
1422 file_path.try_remove(path)
1423 try:
1424 size = file_write(path, content)
1425 except:
1426 # There are two possible places were an exception can occur:
1427 # 1) Inside |content| generator in case of network or unzipping errors.
1428 # 2) Inside file_write itself in case of disk IO errors.
1429 # In any case delete an incomplete file and propagate the exception to
1430 # caller, it will be logged there.
1431 file_path.try_remove(path)
1432 raise
1433 # Make the file read-only in the cache. This has a few side-effects since
1434 # the file node is modified, so every directory entries to this file becomes
1435 # read-only. It's fine here because it is a new file.
1436 file_path.set_read_only(path, True)
1437 with self._lock:
1438 self._add(digest, size)
1439
1440 def hardlink(self, digest, dest, file_mode):
1441 """Hardlinks the file to |dest|.
1442
1443 Note that the file permission bits are on the file node, not the directory
1444 entry, so changing the access bit on any of the directory entries for the
1445 file node will affect them all.
1446 """
1447 path = self._path(digest)
1448 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1449 file_path.hardlink(path, dest)
1450 if file_mode is not None:
1451 # Ignores all other bits.
1452 os.chmod(dest, file_mode & 0500)
1453
1454 def _load(self):
1455 """Loads state of the cache from json file."""
1456 self._lock.assert_locked()
1457
1458 if not os.path.isdir(self.cache_dir):
1459 os.makedirs(self.cache_dir)
1460 else:
1461 # Make sure the cache is read-only.
1462 # TODO(maruel): Calculate the cost and optimize the performance
1463 # accordingly.
1464 file_path.make_tree_read_only(self.cache_dir)
1465
1466 # Load state of the cache.
1467 if os.path.isfile(self.state_file):
1468 try:
1469 self._lru = lru.LRUDict.load(self.state_file)
1470 except ValueError as err:
1471 logging.error('Failed to load cache state: %s' % (err,))
1472 # Don't want to keep broken state file.
1473 file_path.try_remove(self.state_file)
1474
1475 # Ensure that all files listed in the state still exist and add new ones.
1476 previous = self._lru.keys_set()
1477 unknown = []
1478 for filename in os.listdir(self.cache_dir):
1479 if filename == self.STATE_FILE:
1480 continue
1481 if filename in previous:
1482 previous.remove(filename)
1483 continue
1484 # An untracked file.
1485 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1486 logging.warning('Removing unknown file %s from cache', filename)
1487 file_path.try_remove(self._path(filename))
1488 continue
1489 # File that's not referenced in 'state.json'.
1490 # TODO(vadimsh): Verify its SHA1 matches file name.
1491 logging.warning('Adding unknown file %s to cache', filename)
1492 unknown.append(filename)
1493
1494 if unknown:
1495 # Add as oldest files. They will be deleted eventually if not accessed.
1496 self._add_oldest_list(unknown)
1497 logging.warning('Added back %d unknown files', len(unknown))
1498
1499 if previous:
1500 # Filter out entries that were not found.
1501 logging.warning('Removed %d lost files', len(previous))
1502 for filename in previous:
1503 self._lru.pop(filename)
1504 self._trim()
1505
1506 def _save(self):
1507 """Saves the LRU ordering."""
1508 self._lock.assert_locked()
1509 if sys.platform != 'win32':
1510 d = os.path.dirname(self.state_file)
1511 if os.path.isdir(d):
1512 # Necessary otherwise the file can't be created.
1513 file_path.set_read_only(d, False)
1514 if os.path.isfile(self.state_file):
1515 file_path.set_read_only(self.state_file, False)
1516 self._lru.save(self.state_file)
1517
1518 def _trim(self):
1519 """Trims anything we don't know, make sure enough free space exists."""
1520 self._lock.assert_locked()
1521
1522 # Ensure maximum cache size.
1523 if self.policies.max_cache_size:
1524 total_size = sum(self._lru.itervalues())
1525 while total_size > self.policies.max_cache_size:
1526 total_size -= self._remove_lru_file()
1527
1528 # Ensure maximum number of items in the cache.
1529 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1530 for _ in xrange(len(self._lru) - self.policies.max_items):
1531 self._remove_lru_file()
1532
1533 # Ensure enough free space.
1534 self._free_disk = file_path.get_free_space(self.cache_dir)
1535 trimmed_due_to_space = False
1536 while (
1537 self.policies.min_free_space and
1538 self._lru and
1539 self._free_disk < self.policies.min_free_space):
1540 trimmed_due_to_space = True
1541 self._remove_lru_file()
1542 self._free_disk = file_path.get_free_space(self.cache_dir)
1543 if trimmed_due_to_space:
1544 total_usage = sum(self._lru.itervalues())
1545 usage_percent = 0.
1546 if total_usage:
1547 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1548 logging.warning(
1549 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1550 'cache (%.1f%% of its maximum capacity)',
1551 self._free_disk / 1024.,
1552 total_usage / 1024.,
1553 usage_percent)
1554 self._save()
1555
1556 def _path(self, digest):
1557 """Returns the path to one item."""
1558 return os.path.join(self.cache_dir, digest)
1559
1560 def _remove_lru_file(self):
1561 """Removes the last recently used file and returns its size."""
1562 self._lock.assert_locked()
1563 digest, size = self._lru.pop_oldest()
1564 self._delete_file(digest, size)
1565 return size
1566
1567 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1568 """Adds an item into LRU cache marking it as a newest one."""
1569 self._lock.assert_locked()
1570 if size == UNKNOWN_FILE_SIZE:
1571 size = os.stat(self._path(digest)).st_size
1572 self._added.append(size)
1573 self._lru.add(digest, size)
1574
1575 def _add_oldest_list(self, digests):
1576 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1577 self._lock.assert_locked()
1578 pairs = []
1579 for digest in digests:
1580 size = os.stat(self._path(digest)).st_size
1581 self._added.append(size)
1582 pairs.append((digest, size))
1583 self._lru.batch_insert_oldest(pairs)
1584
1585 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1586 """Deletes cache file from the file system."""
1587 self._lock.assert_locked()
1588 try:
1589 if size == UNKNOWN_FILE_SIZE:
1590 size = os.stat(self._path(digest)).st_size
1591 file_path.try_remove(self._path(digest))
1592 self._removed.append(size)
1593 except OSError as e:
1594 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1595
1596
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001597class IsolatedBundle(object):
1598 """Fetched and parsed .isolated file with all dependencies."""
1599
Vadim Shtayura3148e072014-09-02 18:51:52 -07001600 def __init__(self):
1601 self.command = []
1602 self.files = {}
1603 self.read_only = None
1604 self.relative_cwd = None
1605 # The main .isolated file, a IsolatedFile instance.
1606 self.root = None
1607
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001608 def fetch(self, fetch_queue, root_isolated_hash, algo):
1609 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001610
1611 It enables support for "included" .isolated files. They are processed in
1612 strict order but fetched asynchronously from the cache. This is important so
1613 that a file in an included .isolated file that is overridden by an embedding
1614 .isolated file is not fetched needlessly. The includes are fetched in one
1615 pass and the files are fetched as soon as all the ones on the left-side
1616 of the tree were fetched.
1617
1618 The prioritization is very important here for nested .isolated files.
1619 'includes' have the highest priority and the algorithm is optimized for both
1620 deep and wide trees. A deep one is a long link of .isolated files referenced
1621 one at a time by one item in 'includes'. A wide one has a large number of
1622 'includes' in a single .isolated file. 'left' is defined as an included
1623 .isolated file earlier in the 'includes' list. So the order of the elements
1624 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001625
1626 As a side effect this method starts asynchronous fetch of all data files
1627 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1628 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001629 """
1630 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1631
1632 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1633 pending = {}
1634 # Set of hashes of already retrieved items to refuse recursive includes.
1635 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001636 # Set of IsolatedFile's whose data files have already being fetched.
1637 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001638
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001639 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001640 h = isolated_file.obj_hash
1641 if h in seen:
1642 raise isolated_format.IsolatedError(
1643 'IsolatedFile %s is retrieved recursively' % h)
1644 assert h not in pending
1645 seen.add(h)
1646 pending[h] = isolated_file
1647 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1648
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001649 # Start fetching root *.isolated file (single file, not the whole bundle).
1650 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001651
1652 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654 item_hash = fetch_queue.wait(pending)
1655 item = pending.pop(item_hash)
1656 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001657
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001658 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001659 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001660 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001661
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001662 # Always fetch *.isolated files in traversal order, waiting if necessary
1663 # until next to-be-processed node loads. "Waiting" is done by yielding
1664 # back to the outer loop, that waits until some *.isolated is loaded.
1665 for node in isolated_format.walk_includes(self.root):
1666 if node not in processed:
1667 # Not visited, and not yet loaded -> wait for it to load.
1668 if not node.is_loaded:
1669 break
1670 # Not visited and loaded -> process it and continue the traversal.
1671 self._start_fetching_files(node, fetch_queue)
1672 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001673
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001674 # All *.isolated files should be processed by now and only them.
1675 all_isolateds = set(isolated_format.walk_includes(self.root))
1676 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001677
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001678 # Extract 'command' and other bundle properties.
1679 for node in isolated_format.walk_includes(self.root):
1680 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001681 self.relative_cwd = self.relative_cwd or ''
1682
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001683 def _start_fetching_files(self, isolated, fetch_queue):
1684 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001685
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001686 Modifies self.files.
1687 """
1688 logging.debug('fetch_files(%s)', isolated.obj_hash)
1689 for filepath, properties in isolated.data.get('files', {}).iteritems():
1690 # Root isolated has priority on the files being mapped. In particular,
1691 # overridden files must not be fetched.
1692 if filepath not in self.files:
1693 self.files[filepath] = properties
1694 if 'h' in properties:
1695 # Preemptively request files.
1696 logging.debug('fetching %s', filepath)
1697 fetch_queue.add(
1698 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1699
1700 def _update_self(self, node):
1701 """Extracts bundle global parameters from loaded *.isolated file.
1702
1703 Will be called with each loaded *.isolated file in order of traversal of
1704 isolated include graph (see isolated_format.walk_includes).
1705 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001706 # Grabs properties.
1707 if not self.command and node.data.get('command'):
1708 # Ensure paths are correctly separated on windows.
1709 self.command = node.data['command']
1710 if self.command:
1711 self.command[0] = self.command[0].replace('/', os.path.sep)
1712 self.command = tools.fix_python_path(self.command)
1713 if self.read_only is None and node.data.get('read_only') is not None:
1714 self.read_only = node.data['read_only']
1715 if (self.relative_cwd is None and
1716 node.data.get('relative_cwd') is not None):
1717 self.relative_cwd = node.data['relative_cwd']
1718
1719
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001720def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001721 """Returns an object that implements low-level StorageApi interface.
1722
1723 It is used by Storage to work with single isolate |namespace|. It should
1724 rarely be used directly by clients, see 'get_storage' for
1725 a better alternative.
1726
1727 Arguments:
1728 file_or_url: a file path to use file system based storage, or URL of isolate
1729 service to use shared cloud based storage.
1730 namespace: isolate namespace to operate in, also defines hashing and
1731 compression scheme used, i.e. namespace names that end with '-gzip'
1732 store compressed data.
1733
1734 Returns:
1735 Instance of StorageApi subclass.
1736 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001737 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001738 return IsolateServer(file_or_url, namespace)
1739 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001740 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001741
1742
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001743def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001744 """Returns Storage class that can upload and download from |namespace|.
1745
1746 Arguments:
1747 file_or_url: a file path to use file system based storage, or URL of isolate
1748 service to use shared cloud based storage.
1749 namespace: isolate namespace to operate in, also defines hashing and
1750 compression scheme used, i.e. namespace names that end with '-gzip'
1751 store compressed data.
1752
1753 Returns:
1754 Instance of Storage.
1755 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001756 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001757
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001758
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001759def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001760 """Uploads the given tree to the given url.
1761
1762 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001763 base_url: The url of the isolate server to upload to.
1764 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001765 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001766 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001767 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001768 # Filter out symlinks, since they are not represented by items on isolate
1769 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001770 items = []
1771 seen = set()
1772 skipped = 0
1773 for filepath, metadata in infiles:
1774 if 'l' not in metadata and filepath not in seen:
1775 seen.add(filepath)
1776 item = FileItem(
1777 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001778 digest=metadata['h'],
1779 size=metadata['s'],
1780 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001781 items.append(item)
1782 else:
1783 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001784
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001785 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001786 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001787 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001788
1789
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001790def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001791 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001792
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001793 Arguments:
1794 isolated_hash: hash of the root *.isolated file.
1795 storage: Storage class that communicates with isolate storage.
1796 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001797 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001798 require_command: Ensure *.isolated specifies a command to run.
1799
1800 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001801 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001802 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001803 logging.debug(
1804 'fetch_isolated(%s, %s, %s, %s, %s)',
1805 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001806 # Hash algorithm to use, defined by namespace |storage| is using.
1807 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001808 with cache:
1809 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001810 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001811
1812 with tools.Profiler('GetIsolateds'):
1813 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001814 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001815 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1816 try:
1817 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1818 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001819 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001820 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1821 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001822
1823 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001824 bundle.fetch(fetch_queue, isolated_hash, algo)
1825 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001826 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1827 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001828 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001829
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001830 with tools.Profiler('GetRest'):
1831 # Create file system hierarchy.
1832 if not os.path.isdir(outdir):
1833 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001834 create_directories(outdir, bundle.files)
1835 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001836
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001837 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001838 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001839 if not os.path.isdir(cwd):
1840 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001841
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001842 # Multimap: digest -> list of pairs (path, props).
1843 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001844 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001845 if 'h' in props:
1846 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001847
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001848 # Now block on the remaining files to be downloaded and mapped.
1849 logging.info('Retrieving remaining files (%d of them)...',
1850 fetch_queue.pending_count)
1851 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001852 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001853 while remaining:
1854 detector.ping()
1855
1856 # Wait for any item to finish fetching to cache.
1857 digest = fetch_queue.wait(remaining)
1858
1859 # Link corresponding files to a fetched item in cache.
1860 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001861 cache.hardlink(
1862 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001863
1864 # Report progress.
1865 duration = time.time() - last_update
1866 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1867 msg = '%d files remaining...' % len(remaining)
1868 print msg
1869 logging.info(msg)
1870 last_update = time.time()
1871
1872 # Cache could evict some items we just tried to fetch, it's a fatal error.
1873 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001874 raise isolated_format.MappingError(
1875 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001876 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001877
1878
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001879def directory_to_metadata(root, algo, blacklist):
1880 """Returns the FileItem list and .isolated metadata for a directory."""
1881 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001882 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001883 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001884 metadata = {
1885 relpath: isolated_format.file_to_metadata(
1886 os.path.join(root, relpath), {}, False, algo)
1887 for relpath in paths
1888 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001889 for v in metadata.itervalues():
1890 v.pop('t')
1891 items = [
1892 FileItem(
1893 path=os.path.join(root, relpath),
1894 digest=meta['h'],
1895 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001896 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001897 for relpath, meta in metadata.iteritems() if 'h' in meta
1898 ]
1899 return items, metadata
1900
1901
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001902def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001903 """Stores every entries and returns the relevant data.
1904
1905 Arguments:
1906 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001907 files: list of file paths to upload. If a directory is specified, a
1908 .isolated file is created and its hash is returned.
1909 blacklist: function that returns True if a file should be omitted.
1910 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001911 assert all(isinstance(i, unicode) for i in files), files
1912 if len(files) != len(set(map(os.path.abspath, files))):
1913 raise Error('Duplicate entries found.')
1914
1915 results = []
1916 # The temporary directory is only created as needed.
1917 tempdir = None
1918 try:
1919 # TODO(maruel): Yield the files to a worker thread.
1920 items_to_upload = []
1921 for f in files:
1922 try:
1923 filepath = os.path.abspath(f)
1924 if os.path.isdir(filepath):
1925 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001926 items, metadata = directory_to_metadata(
1927 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001928
1929 # Create the .isolated file.
1930 if not tempdir:
1931 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1932 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1933 os.close(handle)
1934 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001935 'algo':
1936 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001937 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001938 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001939 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001940 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001941 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001942 items_to_upload.extend(items)
1943 items_to_upload.append(
1944 FileItem(
1945 path=isolated,
1946 digest=h,
1947 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001948 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001949 results.append((h, f))
1950
1951 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001952 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001953 items_to_upload.append(
1954 FileItem(
1955 path=filepath,
1956 digest=h,
1957 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001958 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001959 results.append((h, f))
1960 else:
1961 raise Error('%s is neither a file or directory.' % f)
1962 except OSError:
1963 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001964 # Technically we would care about which files were uploaded but we don't
1965 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001966 _uploaded_files = storage.upload_items(items_to_upload)
1967 return results
1968 finally:
1969 if tempdir:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001970 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001971
1972
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001973def archive(out, namespace, files, blacklist):
1974 if files == ['-']:
1975 files = sys.stdin.readlines()
1976
1977 if not files:
1978 raise Error('Nothing to upload')
1979
1980 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001981 blacklist = tools.gen_blacklist(blacklist)
1982 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001983 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001984 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1985
1986
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001987@subcommand.usage('<file1..fileN> or - to read from stdin')
1988def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001989 """Archives data to the server.
1990
1991 If a directory is specified, a .isolated file is created the whole directory
1992 is uploaded. Then this .isolated file can be included in another one to run
1993 commands.
1994
1995 The commands output each file that was processed with its content hash. For
1996 directories, the .isolated generated for the directory is listed as the
1997 directory entry itself.
1998 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001999 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002000 parser.add_option(
2001 '--blacklist',
2002 action='append', default=list(DEFAULT_BLACKLIST),
2003 help='List of regexp to use as blacklist filter when uploading '
2004 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002005 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002006 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07002007 if file_path.is_url(options.isolate_server):
2008 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002009 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002010 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002011 except Error as e:
2012 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002013 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002014
2015
2016def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002017 """Download data from the server.
2018
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002019 It can either download individual files or a complete tree from a .isolated
2020 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002021 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002022 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002023 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002024 '-i', '--isolated', metavar='HASH',
2025 help='hash of an isolated file, .isolated file content is discarded, use '
2026 '--file if you need it')
2027 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002028 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2029 help='hash and destination of a file, can be used multiple times')
2030 parser.add_option(
2031 '-t', '--target', metavar='DIR', default=os.getcwd(),
2032 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002033 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002034 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002035 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002036 if args:
2037 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002038 if bool(options.isolated) == bool(options.file):
2039 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002040
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002041 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002042 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002043
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002044 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07002045 if file_path.is_url(remote):
2046 auth.ensure_logged_in(remote)
2047
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002048 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002049 # Fetching individual files.
2050 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002051 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002052 channel = threading_utils.TaskChannel()
2053 pending = {}
2054 for digest, dest in options.file:
2055 pending[digest] = dest
2056 storage.async_fetch(
2057 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002058 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002059 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002060 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002061 functools.partial(file_write, os.path.join(options.target, dest)))
2062 while pending:
2063 fetched = channel.pull()
2064 dest = pending.pop(fetched)
2065 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002066
Vadim Shtayura3172be52013-12-03 12:49:05 -08002067 # Fetching whole isolated tree.
2068 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002069 with cache:
2070 bundle = fetch_isolated(
2071 isolated_hash=options.isolated,
2072 storage=storage,
2073 cache=cache,
2074 outdir=options.target,
2075 require_command=False)
2076 if bundle.command:
2077 rel = os.path.join(options.target, bundle.relative_cwd)
2078 print('To run this test please run from the directory %s:' %
2079 os.path.join(options.target, rel))
2080 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002081
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002082 return 0
2083
2084
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002085def add_isolate_server_options(parser, add_indir):
2086 """Adds --isolate-server and --namespace options to parser.
2087
2088 Includes --indir if desired.
2089 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002090 parser.add_option(
2091 '-I', '--isolate-server',
2092 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002093 help='URL of the Isolate Server to use. Defaults to the environment '
2094 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2095 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002096 parser.add_option(
2097 '--namespace', default='default-gzip',
2098 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002099 if add_indir:
2100 parser.add_option(
2101 '--indir', metavar='DIR',
2102 help='Directory used to store the hashtable instead of using an '
2103 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002104
2105
2106def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002107 """Processes the --isolate-server and --indir options and aborts if neither is
2108 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002109 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002110 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002111 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002112 if not has_indir:
2113 parser.error('--isolate-server is required.')
2114 elif not options.indir:
2115 parser.error('Use one of --indir or --isolate-server.')
2116 else:
2117 if has_indir and options.indir:
2118 parser.error('Use only one of --indir or --isolate-server.')
2119
2120 if options.isolate_server:
2121 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002122 if parts.query:
2123 parser.error('--isolate-server doesn\'t support query parameter.')
2124 if parts.fragment:
2125 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002126 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2127 # what is desired here.
2128 new = list(parts)
2129 if not new[1] and new[2]:
2130 new[1] = new[2].rstrip('/')
2131 new[2] = ''
2132 new[2] = new[2].rstrip('/')
2133 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002134 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002135 return
2136
2137 if file_path.is_url(options.indir):
2138 parser.error('Can\'t use an URL for --indir.')
2139 options.indir = unicode(options.indir).replace('/', os.path.sep)
2140 options.indir = os.path.abspath(
2141 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2142 if not os.path.isdir(options.indir):
2143 parser.error('Path given to --indir must exist.')
2144
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002145
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002146def add_cache_options(parser):
2147 cache_group = optparse.OptionGroup(parser, 'Cache management')
2148 cache_group.add_option(
2149 '--cache', metavar='DIR',
2150 help='Directory to keep a local cache of the files. Accelerates download '
2151 'by reusing already downloaded files. Default=%default')
2152 cache_group.add_option(
2153 '--max-cache-size',
2154 type='int',
2155 metavar='NNN',
2156 default=20*1024*1024*1024,
2157 help='Trim if the cache gets larger than this value, default=%default')
2158 cache_group.add_option(
2159 '--min-free-space',
2160 type='int',
2161 metavar='NNN',
2162 default=2*1024*1024*1024,
2163 help='Trim if disk free space becomes lower than this value, '
2164 'default=%default')
2165 cache_group.add_option(
2166 '--max-items',
2167 type='int',
2168 metavar='NNN',
2169 default=100000,
2170 help='Trim if more than this number of items are in the cache '
2171 'default=%default')
2172 parser.add_option_group(cache_group)
2173
2174
2175def process_cache_options(options):
2176 if options.cache:
2177 policies = CachePolicies(
2178 options.max_cache_size, options.min_free_space, options.max_items)
2179
2180 # |options.cache| path may not exist until DiskCache() instance is created.
2181 return DiskCache(
2182 os.path.abspath(options.cache),
2183 policies,
2184 isolated_format.get_hash_algo(options.namespace))
2185 else:
2186 return MemoryCache()
2187
2188
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002189class OptionParserIsolateServer(tools.OptionParserWithLogging):
2190 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002191 tools.OptionParserWithLogging.__init__(
2192 self,
2193 version=__version__,
2194 prog=os.path.basename(sys.modules[__name__].__file__),
2195 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002196 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002197
2198 def parse_args(self, *args, **kwargs):
2199 options, args = tools.OptionParserWithLogging.parse_args(
2200 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002201 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002202 return options, args
2203
2204
2205def main(args):
2206 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002207 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002208
2209
2210if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002211 fix_encoding.fix_encoding()
2212 tools.disable_buffering()
2213 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002214 sys.exit(main(sys.argv[1:]))