blob: 7dddc20376a42197b526f70c5bfec9043f27ed3f [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05008__version__ = '0.4.2'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040012import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000014import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040015import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000016import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050017import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000018import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050020import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000021import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050022import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040030from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040032from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000033from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000034from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000035
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080036import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040037import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080038
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000040# Version of isolate protocol passed to the server in /handshake request.
41ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043
Vadim Shtayura3148e072014-09-02 18:51:52 -070044# The file size to be used when we don't know the correct file size,
45# generally used for .isolated files.
46UNKNOWN_FILE_SIZE = None
47
48
49# Maximum expected delay (in seconds) between successive file fetches or uploads
50# in Storage. If it takes longer than that, a deadlock might be happening
51# and all stack frames for all threads are dumped to log.
52DEADLOCK_TIMEOUT = 5 * 60
53
54
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000055# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000056# All files are sorted by likelihood of a change in the file content
57# (currently file size is used to estimate this: larger the file -> larger the
58# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000059# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000060# and so on. Numbers here is a trade-off; the more per request, the lower the
61# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
62# larger values cause longer lookups, increasing the initial latency to start
63# uploading, which is especially an issue for large files. This value is
64# optimized for the "few thousands files to look up with minimal number of large
65# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040066ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000067
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000068
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000069# A list of already compressed extension types that should not receive any
70# compression before being uploaded.
71ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040072 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
73 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074]
75
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000076
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000077# Chunk size to use when reading from network stream.
78NET_IO_FILE_CHUNK = 16 * 1024
79
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000080
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000081# Read timeout in seconds for downloads from isolate storage. If there's no
82# response from the server within this timeout whole download will be aborted.
83DOWNLOAD_READ_TIMEOUT = 60
84
85
maruel@chromium.org41601642013-09-18 19:40:46 +000086# The delay (in seconds) to wait between logging statements when retrieving
87# the required files. This is intended to let the user (or buildbot) know that
88# the program is still running.
89DELAY_BETWEEN_UPDATES_IN_SECS = 30
90
91
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050092DEFAULT_BLACKLIST = (
93 # Temporary vim or python files.
94 r'^.+\.(?:pyc|swp)$',
95 # .git or .svn directory.
96 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
97)
98
99
Vadim Shtayura8623c272014-12-01 11:45:27 -0800100# A class to use to communicate with the server by default. Can be changed by
101# 'set_storage_api_class'. Default is IsolateServer.
102_storage_api_cls = None
103
104
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500105class Error(Exception):
106 """Generic runtime error."""
107 pass
108
109
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400110class Aborted(Error):
111 """Operation aborted."""
112 pass
113
114
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000115def stream_read(stream, chunk_size):
116 """Reads chunks from |stream| and yields them."""
117 while True:
118 data = stream.read(chunk_size)
119 if not data:
120 break
121 yield data
122
123
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400124def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800125 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800127 if offset:
128 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000130 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 if not data:
132 break
133 yield data
134
135
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136def file_write(filepath, content_generator):
137 """Writes file content as generated by content_generator.
138
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 Creates the intermediary directory as needed.
140
141 Returns the number of bytes written.
142
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 Meant to be mocked out in unit tests.
144 """
145 filedir = os.path.dirname(filepath)
146 if not os.path.isdir(filedir):
147 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000148 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000149 with open(filepath, 'wb') as f:
150 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000152 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000154
155
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000156def zip_compress(content_generator, level=7):
157 """Reads chunks from |content_generator| and yields zip compressed chunks."""
158 compressor = zlib.compressobj(level)
159 for chunk in content_generator:
160 compressed = compressor.compress(chunk)
161 if compressed:
162 yield compressed
163 tail = compressor.flush(zlib.Z_FINISH)
164 if tail:
165 yield tail
166
167
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400168def zip_decompress(
169 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000170 """Reads zipped data from |content_generator| and yields decompressed data.
171
172 Decompresses data in small chunks (no larger than |chunk_size|) so that
173 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
174
175 Raises IOError if data is corrupted or incomplete.
176 """
177 decompressor = zlib.decompressobj()
178 compressed_size = 0
179 try:
180 for chunk in content_generator:
181 compressed_size += len(chunk)
182 data = decompressor.decompress(chunk, chunk_size)
183 if data:
184 yield data
185 while decompressor.unconsumed_tail:
186 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
187 if data:
188 yield data
189 tail = decompressor.flush()
190 if tail:
191 yield tail
192 except zlib.error as e:
193 raise IOError(
194 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
195 # Ensure all data was read and decompressed.
196 if decompressor.unused_data or decompressor.unconsumed_tail:
197 raise IOError('Not all data was decompressed')
198
199
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000200def get_zip_compression_level(filename):
201 """Given a filename calculates the ideal zip compression level to use."""
202 file_ext = os.path.splitext(filename)[1].lower()
203 # TODO(csharp): Profile to find what compression level works best.
204 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
205
206
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000207def create_directories(base_directory, files):
208 """Creates the directory structure needed by the given list of files."""
209 logging.debug('create_directories(%s, %d)', base_directory, len(files))
210 # Creates the tree of directories to create.
211 directories = set(os.path.dirname(f) for f in files)
212 for item in list(directories):
213 while item:
214 directories.add(item)
215 item = os.path.dirname(item)
216 for d in sorted(directories):
217 if d:
218 os.mkdir(os.path.join(base_directory, d))
219
220
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221def create_symlinks(base_directory, files):
222 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000223 for filepath, properties in files:
224 if 'l' not in properties:
225 continue
226 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500227 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000228 logging.warning('Ignoring symlink %s', filepath)
229 continue
230 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500231 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000232 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233
234
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000235def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 """Determines if the given files appears valid.
237
238 Currently it just checks the file's size.
239 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700240 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000241 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000242 actual_size = os.stat(filepath).st_size
243 if size != actual_size:
244 logging.warning(
245 'Found invalid item %s; %d != %d',
246 os.path.basename(filepath), actual_size, size)
247 return False
248 return True
249
250
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251class Item(object):
252 """An item to push to Storage.
253
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800254 Its digest and size may be provided in advance, if known. Otherwise they will
255 be derived from content(). If digest is provided, it MUST correspond to
256 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000257
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800258 When used with Storage, Item starts its life in a main thread, travels
259 to 'contains' thread, then to 'push' thread and then finally back to
260 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """
262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000264 self.digest = digest
265 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800269 def content(self):
270 """Iterable with content of this item as byte string (str) chunks."""
271 raise NotImplementedError()
272
273 def prepare(self, hash_algo):
274 """Ensures self.digest and self.size are set.
275
276 Uses content() as a source of data to calculate them. Does nothing if digest
277 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000278
279 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800280 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000281 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800282 if self.digest is None or self.size is None:
283 digest = hash_algo()
284 total = 0
285 for chunk in self.content():
286 digest.update(chunk)
287 total += len(chunk)
288 self.digest = digest.hexdigest()
289 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000290
291
292class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800293 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000294
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800295 Its digest and size may be provided in advance, if known. Otherwise they will
296 be derived from the file content.
297 """
298
299 def __init__(self, path, digest=None, size=None, high_priority=False):
300 super(FileItem, self).__init__(
301 digest,
302 size if size is not None else os.stat(path).st_size,
303 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000304 self.path = path
305 self.compression_level = get_zip_compression_level(path)
306
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800307 def content(self):
308 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000309
310
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000311class BufferItem(Item):
312 """A byte buffer to push to Storage."""
313
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800314 def __init__(self, buf, high_priority=False):
315 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000316 self.buffer = buf
317
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800318 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319 return [self.buffer]
320
321
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000322class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800323 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000324
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800325 Implements compression support, parallel 'contains' checks, parallel uploads
326 and more.
327
328 Works only within single namespace (and thus hashing algorithm and compression
329 scheme are fixed).
330
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400331 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
332 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800333 """
334
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700335 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400337 self._use_zip = isolated_format.is_namespace_with_compression(
338 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400339 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000340 self._cpu_thread_pool = None
341 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400342 self._aborted = False
343 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000344
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000345 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700346 def hash_algo(self):
347 """Hashing algorithm used to name files in storage based on their content.
348
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400349 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700350 """
351 return self._hash_algo
352
353 @property
354 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500355 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700356 return self._storage_api.location
357
358 @property
359 def namespace(self):
360 """Isolate namespace used by this storage.
361
362 Indirectly defines hashing scheme and compression method used.
363 """
364 return self._storage_api.namespace
365
366 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 def cpu_thread_pool(self):
368 """ThreadPool for CPU-bound tasks like zipping."""
369 if self._cpu_thread_pool is None:
370 self._cpu_thread_pool = threading_utils.ThreadPool(
371 2, max(threading_utils.num_processors(), 2), 0, 'zip')
372 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000373
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000374 @property
375 def net_thread_pool(self):
376 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
377 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700378 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000379 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000380
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000381 def close(self):
382 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400383 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 if self._cpu_thread_pool:
385 self._cpu_thread_pool.join()
386 self._cpu_thread_pool.close()
387 self._cpu_thread_pool = None
388 if self._net_thread_pool:
389 self._net_thread_pool.join()
390 self._net_thread_pool.close()
391 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400392 logging.info('Done.')
393
394 def abort(self):
395 """Cancels any pending or future operations."""
396 # This is not strictly theadsafe, but in the worst case the logging message
397 # will be printed twice. Not a big deal. In other places it is assumed that
398 # unprotected reads and writes to _aborted are serializable (it is true
399 # for python) and thus no locking is used.
400 if not self._aborted:
401 logging.warning('Aborting... It can take a while.')
402 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000403
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000404 def __enter__(self):
405 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 assert not self._prev_sig_handlers, self._prev_sig_handlers
407 for s in (signal.SIGINT, signal.SIGTERM):
408 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000409 return self
410
411 def __exit__(self, _exc_type, _exc_value, _traceback):
412 """Context manager interface."""
413 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400414 while self._prev_sig_handlers:
415 s, h = self._prev_sig_handlers.popitem()
416 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000417 return False
418
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000419 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800420 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000421
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800422 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000423
424 Arguments:
425 items: list of Item instances that represents data to upload.
426
427 Returns:
428 List of items that were uploaded. All other items are already there.
429 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700430 logging.info('upload_items(items=%d)', len(items))
431
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800432 # Ensure all digests are calculated.
433 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700434 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800435
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000436 # For each digest keep only first Item that matches it. All other items
437 # are just indistinguishable copies from the point of view of isolate
438 # server (it doesn't care about paths at all, only content and digests).
439 seen = {}
440 duplicates = 0
441 for item in items:
442 if seen.setdefault(item.digest, item) is not item:
443 duplicates += 1
444 items = seen.values()
445 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700446 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000447
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000448 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000449 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000450 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800451 channel = threading_utils.TaskChannel()
452 for missing_item, push_state in self.get_missing_items(items):
453 missing.add(missing_item)
454 self.async_push(channel, missing_item, push_state)
455
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 # No need to spawn deadlock detector thread if there's nothing to upload.
457 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700458 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000460 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 detector.ping()
462 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000463 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000464 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000465 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000466 logging.info('All files are uploaded')
467
468 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000469 total = len(items)
470 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000471 logging.info(
472 'Total: %6d, %9.1fkb',
473 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000474 total_size / 1024.)
475 cache_hit = set(items) - missing
476 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000477 logging.info(
478 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
479 len(cache_hit),
480 cache_hit_size / 1024.,
481 len(cache_hit) * 100. / total,
482 cache_hit_size * 100. / total_size if total_size else 0)
483 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000484 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000485 logging.info(
486 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
487 len(cache_miss),
488 cache_miss_size / 1024.,
489 len(cache_miss) * 100. / total,
490 cache_miss_size * 100. / total_size if total_size else 0)
491
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000492 return uploaded
493
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800494 def get_fetch_url(self, item):
495 """Returns an URL that can be used to fetch given item once it's uploaded.
496
497 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000498
499 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Returns:
503 An URL or None if underlying protocol doesn't support this.
504 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700505 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800506 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000507
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800508 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000509 """Starts asynchronous push to the server in a parallel thread.
510
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 Can be used only after |item| was checked for presence on a server with
512 'get_missing_items' call. 'get_missing_items' returns |push_state| object
513 that contains storage specific information describing how to upload
514 the item (for example in case of cloud storage, it is signed upload URLs).
515
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000516 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000517 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000518 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800519 push_state: push state returned by 'get_missing_items' call for |item|.
520
521 Returns:
522 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000523 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800524 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400525 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700526 threading_utils.PRIORITY_HIGH if item.high_priority
527 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000529 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400530 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400531 if self._aborted:
532 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700533 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800534 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000535 return item
536
537 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700538 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 self.net_thread_pool.add_task_with_channel(
540 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000541 return
542
543 # If zipping is enabled, zip in a separate thread.
544 def zip_and_push():
545 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
546 # content right here. It will block until all file is zipped.
547 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400548 if self._aborted:
549 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800550 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000551 data = ''.join(stream)
552 except Exception as exc:
553 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800554 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000556 self.net_thread_pool.add_task_with_channel(
557 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000558 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000559
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800560 def push(self, item, push_state):
561 """Synchronously pushes a single item to the server.
562
563 If you need to push many items at once, consider using 'upload_items' or
564 'async_push' with instance of TaskChannel.
565
566 Arguments:
567 item: item to upload as instance of Item class.
568 push_state: push state returned by 'get_missing_items' call for |item|.
569
570 Returns:
571 Pushed item (same object as |item|).
572 """
573 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700574 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 self.async_push(channel, item, push_state)
576 pushed = channel.pull()
577 assert pushed is item
578 return item
579
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000580 def async_fetch(self, channel, priority, digest, size, sink):
581 """Starts asynchronous fetch from the server in a parallel thread.
582
583 Arguments:
584 channel: TaskChannel that receives back |digest| when download ends.
585 priority: thread pool task priority for the fetch.
586 digest: hex digest of an item to download.
587 size: expected size of the item (after decompression).
588 sink: function that will be called as sink(generator).
589 """
590 def fetch():
591 try:
592 # Prepare reading pipeline.
593 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700594 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400595 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000596 # Run |stream| through verifier that will assert its size.
597 verifier = FetchStreamVerifier(stream, size)
598 # Verified stream goes to |sink|.
599 sink(verifier.run())
600 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800601 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000602 raise
603 return digest
604
605 # Don't bother with zip_thread_pool for decompression. Decompression is
606 # really fast and most probably IO bound anyway.
607 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
608
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000609 def get_missing_items(self, items):
610 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000611
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000612 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613
614 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000615 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000616
617 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800618 For each missing item it yields a pair (item, push_state), where:
619 * item - Item object that is missing (one of |items|).
620 * push_state - opaque object that contains storage specific information
621 describing how to upload the item (for example in case of cloud
622 storage, it is signed upload URLs). It can later be passed to
623 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000624 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000625 channel = threading_utils.TaskChannel()
626 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627
628 # Ensure all digests are calculated.
629 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700630 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400632 def contains(batch):
633 if self._aborted:
634 raise Aborted()
635 return self._storage_api.contains(batch)
636
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000637 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800638 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400639 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400640 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000643 # Yield results as they come in.
644 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645 for missing_item, push_state in channel.pull().iteritems():
646 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000647
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000648
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649def batch_items_for_check(items):
650 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800652 Each batch corresponds to a single 'exists?' query to the server via a call
653 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000654
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800655 Arguments:
656 items: a list of Item objects.
657
658 Yields:
659 Batches of items to query for existence in a single operation,
660 each batch is a list of Item objects.
661 """
662 batch_count = 0
663 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
664 next_queries = []
665 for item in sorted(items, key=lambda x: x.size, reverse=True):
666 next_queries.append(item)
667 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000668 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800669 next_queries = []
670 batch_count += 1
671 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
672 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
673 if next_queries:
674 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675
676
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000677class FetchQueue(object):
678 """Fetches items from Storage and places them into LocalCache.
679
680 It manages multiple concurrent fetch operations. Acts as a bridge between
681 Storage and LocalCache so that Storage and LocalCache don't depend on each
682 other at all.
683 """
684
685 def __init__(self, storage, cache):
686 self.storage = storage
687 self.cache = cache
688 self._channel = threading_utils.TaskChannel()
689 self._pending = set()
690 self._accessed = set()
691 self._fetched = cache.cached_set()
692
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400693 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700694 self,
695 digest,
696 size=UNKNOWN_FILE_SIZE,
697 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000698 """Starts asynchronous fetch of item |digest|."""
699 # Fetching it now?
700 if digest in self._pending:
701 return
702
703 # Mark this file as in use, verify_all_cached will later ensure it is still
704 # in cache.
705 self._accessed.add(digest)
706
707 # Already fetched? Notify cache to update item's LRU position.
708 if digest in self._fetched:
709 # 'touch' returns True if item is in cache and not corrupted.
710 if self.cache.touch(digest, size):
711 return
712 # Item is corrupted, remove it from cache and fetch it again.
713 self._fetched.remove(digest)
714 self.cache.evict(digest)
715
716 # TODO(maruel): It should look at the free disk space, the current cache
717 # size and the size of the new item on every new item:
718 # - Trim the cache as more entries are listed when free disk space is low,
719 # otherwise if the amount of data downloaded during the run > free disk
720 # space, it'll crash.
721 # - Make sure there's enough free disk space to fit all dependencies of
722 # this run! If not, abort early.
723
724 # Start fetching.
725 self._pending.add(digest)
726 self.storage.async_fetch(
727 self._channel, priority, digest, size,
728 functools.partial(self.cache.write, digest))
729
730 def wait(self, digests):
731 """Starts a loop that waits for at least one of |digests| to be retrieved.
732
733 Returns the first digest retrieved.
734 """
735 # Flush any already fetched items.
736 for digest in digests:
737 if digest in self._fetched:
738 return digest
739
740 # Ensure all requested items are being fetched now.
741 assert all(digest in self._pending for digest in digests), (
742 digests, self._pending)
743
744 # Wait for some requested item to finish fetching.
745 while self._pending:
746 digest = self._channel.pull()
747 self._pending.remove(digest)
748 self._fetched.add(digest)
749 if digest in digests:
750 return digest
751
752 # Should never reach this point due to assert above.
753 raise RuntimeError('Impossible state')
754
755 def inject_local_file(self, path, algo):
756 """Adds local file to the cache as if it was fetched from storage."""
757 with open(path, 'rb') as f:
758 data = f.read()
759 digest = algo(data).hexdigest()
760 self.cache.write(digest, [data])
761 self._fetched.add(digest)
762 return digest
763
764 @property
765 def pending_count(self):
766 """Returns number of items to be fetched."""
767 return len(self._pending)
768
769 def verify_all_cached(self):
770 """True if all accessed items are in cache."""
771 return self._accessed.issubset(self.cache.cached_set())
772
773
774class FetchStreamVerifier(object):
775 """Verifies that fetched file is valid before passing it to the LocalCache."""
776
777 def __init__(self, stream, expected_size):
778 self.stream = stream
779 self.expected_size = expected_size
780 self.current_size = 0
781
782 def run(self):
783 """Generator that yields same items as |stream|.
784
785 Verifies |stream| is complete before yielding a last chunk to consumer.
786
787 Also wraps IOError produced by consumer into MappingError exceptions since
788 otherwise Storage will retry fetch on unrelated local cache errors.
789 """
790 # Read one chunk ahead, keep it in |stored|.
791 # That way a complete stream can be verified before pushing last chunk
792 # to consumer.
793 stored = None
794 for chunk in self.stream:
795 assert chunk is not None
796 if stored is not None:
797 self._inspect_chunk(stored, is_last=False)
798 try:
799 yield stored
800 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400801 raise isolated_format.MappingError(
802 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000803 stored = chunk
804 if stored is not None:
805 self._inspect_chunk(stored, is_last=True)
806 try:
807 yield stored
808 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400809 raise isolated_format.MappingError(
810 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000811
812 def _inspect_chunk(self, chunk, is_last):
813 """Called for each fetched chunk before passing it to consumer."""
814 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400815 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700816 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000817 (self.expected_size != self.current_size)):
818 raise IOError('Incorrect file size: expected %d, got %d' % (
819 self.expected_size, self.current_size))
820
821
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000822class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800823 """Interface for classes that implement low-level storage operations.
824
825 StorageApi is oblivious of compression and hashing scheme used. This details
826 are handled in higher level Storage class.
827
828 Clients should generally not use StorageApi directly. Storage class is
829 preferred since it implements compression and upload optimizations.
830 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000831
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700832 @property
833 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500834 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700835 raise NotImplementedError()
836
837 @property
838 def namespace(self):
839 """Isolate namespace used by this storage.
840
841 Indirectly defines hashing scheme and compression method used.
842 """
843 raise NotImplementedError()
844
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000845 def get_fetch_url(self, digest):
846 """Returns an URL that can be used to fetch an item with given digest.
847
848 Arguments:
849 digest: hex digest of item to fetch.
850
851 Returns:
852 An URL or None if the protocol doesn't support this.
853 """
854 raise NotImplementedError()
855
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800856 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000857 """Fetches an object and yields its content.
858
859 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000860 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800861 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000862
863 Yields:
864 Chunks of downloaded item (as str objects).
865 """
866 raise NotImplementedError()
867
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800868 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000869 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000870
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800871 |item| MUST go through 'contains' call to get |push_state| before it can
872 be pushed to the storage.
873
874 To be clear, here is one possible usage:
875 all_items = [... all items to push as Item subclasses ...]
876 for missing_item, push_state in storage_api.contains(all_items).items():
877 storage_api.push(missing_item, push_state)
878
879 When pushing to a namespace with compression, data that should be pushed
880 and data provided by the item is not the same. In that case |content| is
881 not None and it yields chunks of compressed data (using item.content() as
882 a source of original uncompressed data). This is implemented by Storage
883 class.
884
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000885 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000886 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800887 push_state: push state object as returned by 'contains' call.
888 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000889
890 Returns:
891 None.
892 """
893 raise NotImplementedError()
894
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000895 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800896 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000897
898 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800899 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000900
901 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800902 A dict missing Item -> opaque push state object to be passed to 'push'.
903 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000904 """
905 raise NotImplementedError()
906
907
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800908class _IsolateServerPushState(object):
909 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500910
911 Note this needs to be a global class to support pickling.
912 """
913
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500914 def __init__(self, upload_url, finalize_url, size):
Mike Frysinger27f03da2014-02-12 16:47:01 -0500915 self.upload_url = upload_url
916 self.finalize_url = finalize_url
917 self.uploaded = False
918 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500919 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500920
921
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000922class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000923 """StorageApi implementation that downloads and uploads to Isolate Server.
924
925 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800926 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000927 """
928
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000929 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000930 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500931 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700932 self._base_url = base_url.rstrip('/')
933 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000934 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000935 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500936 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000937
938 @staticmethod
939 def _generate_handshake_request():
940 """Returns a dict to be sent as handshake request body."""
941 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
942 return {
943 'client_app_version': __version__,
944 'fetcher': True,
945 'protocol_version': ISOLATE_PROTOCOL_VERSION,
946 'pusher': True,
947 }
948
949 @staticmethod
950 def _validate_handshake_response(caps):
951 """Validates and normalizes handshake response."""
952 logging.info('Protocol version: %s', caps['protocol_version'])
953 logging.info('Server version: %s', caps['server_app_version'])
954 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400955 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000956 if not caps['access_token']:
957 raise ValueError('access_token is missing')
958 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000959
960 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000961 def _server_capabilities(self):
962 """Performs handshake with the server if not yet done.
963
964 Returns:
965 Server capabilities dictionary as returned by /handshake endpoint.
966
967 Raises:
968 MappingError if server rejects the handshake.
969 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000970 # TODO(maruel): Make this request much earlier asynchronously while the
971 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800972
973 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
974 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000975 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000976 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000977 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400978 caps = net.url_read_json(
979 url=self._base_url + '/content-gs/handshake',
980 data=self._generate_handshake_request())
981 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400982 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000983 if not isinstance(caps, dict):
984 raise ValueError('Expecting JSON dict')
985 self._server_caps = self._validate_handshake_response(caps)
986 except (ValueError, KeyError, TypeError) as exc:
987 # KeyError exception has very confusing str conversion: it's just a
988 # missing key value and nothing else. So print exception class name
989 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400990 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400991 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000992 exc.__class__.__name__, exc))
993 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000994
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700995 @property
996 def location(self):
997 return self._base_url
998
999 @property
1000 def namespace(self):
1001 return self._namespace
1002
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001003 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001004 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001005 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001006 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001007
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001008 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001009 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001010 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001011
Vadim Shtayura8623c272014-12-01 11:45:27 -08001012 connection = self.do_fetch(source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001013 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001014 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001015
1016 # If |offset| is used, verify server respects it by checking Content-Range.
1017 if offset:
1018 content_range = connection.get_header('Content-Range')
1019 if not content_range:
1020 raise IOError('Missing Content-Range header')
1021
1022 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1023 # According to a spec, <size> can be '*' meaning "Total size of the file
1024 # is not known in advance".
1025 try:
1026 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1027 if not match:
1028 raise ValueError()
1029 content_offset = int(match.group(1))
1030 last_byte_index = int(match.group(2))
1031 size = None if match.group(3) == '*' else int(match.group(3))
1032 except ValueError:
1033 raise IOError('Invalid Content-Range header: %s' % content_range)
1034
1035 # Ensure returned offset equals requested one.
1036 if offset != content_offset:
1037 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1038 offset, content_offset, content_range))
1039
1040 # Ensure entire tail of the file is returned.
1041 if size is not None and last_byte_index + 1 != size:
1042 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1043
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001044 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001045
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001046 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001047 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001048 assert item.digest is not None
1049 assert item.size is not None
1050 assert isinstance(push_state, _IsolateServerPushState)
1051 assert not push_state.finalized
1052
1053 # Default to item.content().
1054 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001055 logging.info('Push state size: %d', push_state.size)
1056 if isinstance(content, (basestring, list)):
1057 # Memory is already used, too late.
1058 with self._lock:
1059 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001060 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001061 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1062 # If |content| is indeed a generator, it can not be re-winded back to the
1063 # beginning of the stream. A retry will find it exhausted. A possible
1064 # solution is to wrap |content| generator with some sort of caching
1065 # restartable generator. It should be done alongside streaming support
1066 # implementation.
1067 #
1068 # In theory, we should keep the generator, so that it is not serialized in
1069 # memory. Sadly net.HttpService.request() requires the body to be
1070 # serialized.
1071 assert isinstance(content, types.GeneratorType), repr(content)
1072 slept = False
1073 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
1074 # One byte less than 1gb. This is to cope with incompressible content.
1075 max_size = int(sys.maxsize * 0.5)
1076 while True:
1077 with self._lock:
1078 # This is due to 32 bits python when uploading very large files. The
1079 # problem is that it's comparing uncompressed sizes, while we care
1080 # about compressed sizes since it's what is serialized in memory.
1081 # The first check assumes large files are compressible and that by
1082 # throttling one upload at once, we can survive. Otherwise, kaboom.
1083 memory_use = self._memory_use
1084 if ((push_state.size >= max_size and not memory_use) or
1085 (memory_use + push_state.size <= max_size)):
1086 self._memory_use += push_state.size
1087 memory_use = self._memory_use
1088 break
1089 time.sleep(0.1)
1090 slept = True
1091 if slept:
1092 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001093
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001094 try:
1095 # This push operation may be a retry after failed finalization call below,
1096 # no need to reupload contents in that case.
1097 if not push_state.uploaded:
1098 # PUT file to |upload_url|.
1099 success = self.do_push(push_state.upload_url, content)
1100 if not success:
1101 raise IOError('Failed to upload a file %s to %s' % (
1102 item.digest, push_state.upload_url))
1103 push_state.uploaded = True
1104 else:
1105 logging.info(
1106 'A file %s already uploaded, retrying finalization only',
1107 item.digest)
1108
1109 # Optionally notify the server that it's done.
1110 if push_state.finalize_url:
1111 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1112 # send it to isolated server. That way isolate server can verify that
1113 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1114 # stored files).
1115 # TODO(maruel): Fix the server to accept properly data={} so
1116 # url_read_json() can be used.
1117 response = net.url_read(
1118 url=push_state.finalize_url,
1119 data='',
1120 content_type='application/json',
1121 method='POST')
1122 if response is None:
1123 raise IOError('Failed to finalize an upload of %s' % item.digest)
1124 push_state.finalized = True
1125 finally:
1126 with self._lock:
1127 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001128
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001129 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001130 # Ensure all items were initialized with 'prepare' call. Storage does that.
1131 assert all(i.digest is not None and i.size is not None for i in items)
1132
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001133 # Request body is a json encoded list of dicts.
1134 body = [
1135 {
1136 'h': item.digest,
1137 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001138 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001139 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001140 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001141
1142 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001143 self._base_url,
1144 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001145 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001146
1147 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001148 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001149 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001150 response = net.url_read_json(url=query_url, data=body)
1151 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001152 raise isolated_format.MappingError(
1153 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001154 if not isinstance(response, list):
1155 raise ValueError('Expecting response with json-encoded list')
1156 if len(response) != len(items):
1157 raise ValueError(
1158 'Incorrect number of items in the list, expected %d, '
1159 'but got %d' % (len(items), len(response)))
1160 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001161 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001162 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001163
1164 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001165 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001166 for i, push_urls in enumerate(response):
1167 if push_urls:
1168 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001169 missing_items[items[i]] = _IsolateServerPushState(
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001170 push_urls[0], push_urls[1], items[i].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001171 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001172 len(items), len(items) - len(missing_items))
1173 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001174
Vadim Shtayura8623c272014-12-01 11:45:27 -08001175 def do_fetch(self, url, offset):
1176 """Fetches isolated data from the URL.
1177
1178 Used only for fetching files, not for API calls. Can be overridden in
1179 subclasses.
1180
1181 Args:
1182 url: URL to fetch the data from, can possibly return http redirect.
1183 offset: byte offset inside the file to start fetching from.
1184
1185 Returns:
1186 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1187 """
1188 return net.url_open(
1189 url,
1190 read_timeout=DOWNLOAD_READ_TIMEOUT,
1191 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1192
1193 def do_push(self, url, content):
1194 """Uploads isolated file to the URL.
1195
1196 Used only for storing files, not for API calls. Can be overridden in
1197 subclasses.
1198
1199 Args:
1200 url: URL to upload the data to.
1201 content: an iterable that yields 'str' chunks.
1202
1203 Returns:
1204 True on success, False on failure.
1205 """
1206 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1207 # upload support is implemented.
1208 if isinstance(content, list) and len(content) == 1:
1209 content = content[0]
1210 else:
1211 content = ''.join(content)
1212 response = net.url_read(
1213 url=url,
1214 data=content,
1215 content_type='application/octet-stream',
1216 method='PUT')
1217 return response is not None
1218
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001219
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001220class LocalCache(object):
1221 """Local cache that stores objects fetched via Storage.
1222
1223 It can be accessed concurrently from multiple threads, so it should protect
1224 its internal state with some lock.
1225 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001226 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001227
1228 def __enter__(self):
1229 """Context manager interface."""
1230 return self
1231
1232 def __exit__(self, _exc_type, _exec_value, _traceback):
1233 """Context manager interface."""
1234 return False
1235
1236 def cached_set(self):
1237 """Returns a set of all cached digests (always a new object)."""
1238 raise NotImplementedError()
1239
1240 def touch(self, digest, size):
1241 """Ensures item is not corrupted and updates its LRU position.
1242
1243 Arguments:
1244 digest: hash digest of item to check.
1245 size: expected size of this item.
1246
1247 Returns:
1248 True if item is in cache and not corrupted.
1249 """
1250 raise NotImplementedError()
1251
1252 def evict(self, digest):
1253 """Removes item from cache if it's there."""
1254 raise NotImplementedError()
1255
1256 def read(self, digest):
1257 """Returns contents of the cached item as a single str."""
1258 raise NotImplementedError()
1259
1260 def write(self, digest, content):
1261 """Reads data from |content| generator and stores it in cache."""
1262 raise NotImplementedError()
1263
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001264 def hardlink(self, digest, dest, file_mode):
1265 """Ensures file at |dest| has same content as cached |digest|.
1266
1267 If file_mode is provided, it is used to set the executable bit if
1268 applicable.
1269 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001270 raise NotImplementedError()
1271
1272
1273class MemoryCache(LocalCache):
1274 """LocalCache implementation that stores everything in memory."""
1275
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001276 def __init__(self, file_mode_mask=0500):
1277 """Args:
1278 file_mode_mask: bit mask to AND file mode with. Default value will make
1279 all mapped files to be read only.
1280 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001281 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001282 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001283 # Let's not assume dict is thread safe.
1284 self._lock = threading.Lock()
1285 self._contents = {}
1286
1287 def cached_set(self):
1288 with self._lock:
1289 return set(self._contents)
1290
1291 def touch(self, digest, size):
1292 with self._lock:
1293 return digest in self._contents
1294
1295 def evict(self, digest):
1296 with self._lock:
1297 self._contents.pop(digest, None)
1298
1299 def read(self, digest):
1300 with self._lock:
1301 return self._contents[digest]
1302
1303 def write(self, digest, content):
1304 # Assemble whole stream before taking the lock.
1305 data = ''.join(content)
1306 with self._lock:
1307 self._contents[digest] = data
1308
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001309 def hardlink(self, digest, dest, file_mode):
1310 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001311 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001312 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001313 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001314
1315
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001316class CachePolicies(object):
1317 def __init__(self, max_cache_size, min_free_space, max_items):
1318 """
1319 Arguments:
1320 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1321 cache is effectively a leak.
1322 - min_free_space: Trim if disk free space becomes lower than this value. If
1323 0, it unconditionally fill the disk.
1324 - max_items: Maximum number of items to keep in the cache. If 0, do not
1325 enforce a limit.
1326 """
1327 self.max_cache_size = max_cache_size
1328 self.min_free_space = min_free_space
1329 self.max_items = max_items
1330
1331
1332class DiskCache(LocalCache):
1333 """Stateful LRU cache in a flat hash table in a directory.
1334
1335 Saves its state as json file.
1336 """
1337 STATE_FILE = 'state.json'
1338
1339 def __init__(self, cache_dir, policies, hash_algo):
1340 """
1341 Arguments:
1342 cache_dir: directory where to place the cache.
1343 policies: cache retention policies.
1344 algo: hashing algorithm used.
1345 """
1346 super(DiskCache, self).__init__()
1347 self.cache_dir = cache_dir
1348 self.policies = policies
1349 self.hash_algo = hash_algo
1350 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1351
1352 # All protected methods (starting with '_') except _path should be called
1353 # with this lock locked.
1354 self._lock = threading_utils.LockWithAssert()
1355 self._lru = lru.LRUDict()
1356
1357 # Profiling values.
1358 self._added = []
1359 self._removed = []
1360 self._free_disk = 0
1361
1362 with tools.Profiler('Setup'):
1363 with self._lock:
1364 self._load()
1365
1366 def __enter__(self):
1367 return self
1368
1369 def __exit__(self, _exc_type, _exec_value, _traceback):
1370 with tools.Profiler('CleanupTrimming'):
1371 with self._lock:
1372 self._trim()
1373
1374 logging.info(
1375 '%5d (%8dkb) added',
1376 len(self._added), sum(self._added) / 1024)
1377 logging.info(
1378 '%5d (%8dkb) current',
1379 len(self._lru),
1380 sum(self._lru.itervalues()) / 1024)
1381 logging.info(
1382 '%5d (%8dkb) removed',
1383 len(self._removed), sum(self._removed) / 1024)
1384 logging.info(
1385 ' %8dkb free',
1386 self._free_disk / 1024)
1387 return False
1388
1389 def cached_set(self):
1390 with self._lock:
1391 return self._lru.keys_set()
1392
1393 def touch(self, digest, size):
1394 """Verifies an actual file is valid.
1395
1396 Note that is doesn't compute the hash so it could still be corrupted if the
1397 file size didn't change.
1398
1399 TODO(maruel): More stringent verification while keeping the check fast.
1400 """
1401 # Do the check outside the lock.
1402 if not is_valid_file(self._path(digest), size):
1403 return False
1404
1405 # Update it's LRU position.
1406 with self._lock:
1407 if digest not in self._lru:
1408 return False
1409 self._lru.touch(digest)
1410 return True
1411
1412 def evict(self, digest):
1413 with self._lock:
1414 self._lru.pop(digest)
1415 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1416
1417 def read(self, digest):
1418 with open(self._path(digest), 'rb') as f:
1419 return f.read()
1420
1421 def write(self, digest, content):
1422 path = self._path(digest)
1423 # A stale broken file may remain. It is possible for the file to have write
1424 # access bit removed which would cause the file_write() call to fail to open
1425 # in write mode. Take no chance here.
1426 file_path.try_remove(path)
1427 try:
1428 size = file_write(path, content)
1429 except:
1430 # There are two possible places were an exception can occur:
1431 # 1) Inside |content| generator in case of network or unzipping errors.
1432 # 2) Inside file_write itself in case of disk IO errors.
1433 # In any case delete an incomplete file and propagate the exception to
1434 # caller, it will be logged there.
1435 file_path.try_remove(path)
1436 raise
1437 # Make the file read-only in the cache. This has a few side-effects since
1438 # the file node is modified, so every directory entries to this file becomes
1439 # read-only. It's fine here because it is a new file.
1440 file_path.set_read_only(path, True)
1441 with self._lock:
1442 self._add(digest, size)
1443
1444 def hardlink(self, digest, dest, file_mode):
1445 """Hardlinks the file to |dest|.
1446
1447 Note that the file permission bits are on the file node, not the directory
1448 entry, so changing the access bit on any of the directory entries for the
1449 file node will affect them all.
1450 """
1451 path = self._path(digest)
1452 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1453 file_path.hardlink(path, dest)
1454 if file_mode is not None:
1455 # Ignores all other bits.
1456 os.chmod(dest, file_mode & 0500)
1457
1458 def _load(self):
1459 """Loads state of the cache from json file."""
1460 self._lock.assert_locked()
1461
1462 if not os.path.isdir(self.cache_dir):
1463 os.makedirs(self.cache_dir)
1464 else:
1465 # Make sure the cache is read-only.
1466 # TODO(maruel): Calculate the cost and optimize the performance
1467 # accordingly.
1468 file_path.make_tree_read_only(self.cache_dir)
1469
1470 # Load state of the cache.
1471 if os.path.isfile(self.state_file):
1472 try:
1473 self._lru = lru.LRUDict.load(self.state_file)
1474 except ValueError as err:
1475 logging.error('Failed to load cache state: %s' % (err,))
1476 # Don't want to keep broken state file.
1477 file_path.try_remove(self.state_file)
1478
1479 # Ensure that all files listed in the state still exist and add new ones.
1480 previous = self._lru.keys_set()
1481 unknown = []
1482 for filename in os.listdir(self.cache_dir):
1483 if filename == self.STATE_FILE:
1484 continue
1485 if filename in previous:
1486 previous.remove(filename)
1487 continue
1488 # An untracked file.
1489 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1490 logging.warning('Removing unknown file %s from cache', filename)
1491 file_path.try_remove(self._path(filename))
1492 continue
1493 # File that's not referenced in 'state.json'.
1494 # TODO(vadimsh): Verify its SHA1 matches file name.
1495 logging.warning('Adding unknown file %s to cache', filename)
1496 unknown.append(filename)
1497
1498 if unknown:
1499 # Add as oldest files. They will be deleted eventually if not accessed.
1500 self._add_oldest_list(unknown)
1501 logging.warning('Added back %d unknown files', len(unknown))
1502
1503 if previous:
1504 # Filter out entries that were not found.
1505 logging.warning('Removed %d lost files', len(previous))
1506 for filename in previous:
1507 self._lru.pop(filename)
1508 self._trim()
1509
1510 def _save(self):
1511 """Saves the LRU ordering."""
1512 self._lock.assert_locked()
1513 if sys.platform != 'win32':
1514 d = os.path.dirname(self.state_file)
1515 if os.path.isdir(d):
1516 # Necessary otherwise the file can't be created.
1517 file_path.set_read_only(d, False)
1518 if os.path.isfile(self.state_file):
1519 file_path.set_read_only(self.state_file, False)
1520 self._lru.save(self.state_file)
1521
1522 def _trim(self):
1523 """Trims anything we don't know, make sure enough free space exists."""
1524 self._lock.assert_locked()
1525
1526 # Ensure maximum cache size.
1527 if self.policies.max_cache_size:
1528 total_size = sum(self._lru.itervalues())
1529 while total_size > self.policies.max_cache_size:
1530 total_size -= self._remove_lru_file()
1531
1532 # Ensure maximum number of items in the cache.
1533 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1534 for _ in xrange(len(self._lru) - self.policies.max_items):
1535 self._remove_lru_file()
1536
1537 # Ensure enough free space.
1538 self._free_disk = file_path.get_free_space(self.cache_dir)
1539 trimmed_due_to_space = False
1540 while (
1541 self.policies.min_free_space and
1542 self._lru and
1543 self._free_disk < self.policies.min_free_space):
1544 trimmed_due_to_space = True
1545 self._remove_lru_file()
1546 self._free_disk = file_path.get_free_space(self.cache_dir)
1547 if trimmed_due_to_space:
1548 total_usage = sum(self._lru.itervalues())
1549 usage_percent = 0.
1550 if total_usage:
1551 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1552 logging.warning(
1553 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1554 'cache (%.1f%% of its maximum capacity)',
1555 self._free_disk / 1024.,
1556 total_usage / 1024.,
1557 usage_percent)
1558 self._save()
1559
1560 def _path(self, digest):
1561 """Returns the path to one item."""
1562 return os.path.join(self.cache_dir, digest)
1563
1564 def _remove_lru_file(self):
1565 """Removes the last recently used file and returns its size."""
1566 self._lock.assert_locked()
1567 digest, size = self._lru.pop_oldest()
1568 self._delete_file(digest, size)
1569 return size
1570
1571 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1572 """Adds an item into LRU cache marking it as a newest one."""
1573 self._lock.assert_locked()
1574 if size == UNKNOWN_FILE_SIZE:
1575 size = os.stat(self._path(digest)).st_size
1576 self._added.append(size)
1577 self._lru.add(digest, size)
1578
1579 def _add_oldest_list(self, digests):
1580 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1581 self._lock.assert_locked()
1582 pairs = []
1583 for digest in digests:
1584 size = os.stat(self._path(digest)).st_size
1585 self._added.append(size)
1586 pairs.append((digest, size))
1587 self._lru.batch_insert_oldest(pairs)
1588
1589 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1590 """Deletes cache file from the file system."""
1591 self._lock.assert_locked()
1592 try:
1593 if size == UNKNOWN_FILE_SIZE:
1594 size = os.stat(self._path(digest)).st_size
1595 file_path.try_remove(self._path(digest))
1596 self._removed.append(size)
1597 except OSError as e:
1598 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1599
1600
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001601class IsolatedBundle(object):
1602 """Fetched and parsed .isolated file with all dependencies."""
1603
Vadim Shtayura3148e072014-09-02 18:51:52 -07001604 def __init__(self):
1605 self.command = []
1606 self.files = {}
1607 self.read_only = None
1608 self.relative_cwd = None
1609 # The main .isolated file, a IsolatedFile instance.
1610 self.root = None
1611
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001612 def fetch(self, fetch_queue, root_isolated_hash, algo):
1613 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001614
1615 It enables support for "included" .isolated files. They are processed in
1616 strict order but fetched asynchronously from the cache. This is important so
1617 that a file in an included .isolated file that is overridden by an embedding
1618 .isolated file is not fetched needlessly. The includes are fetched in one
1619 pass and the files are fetched as soon as all the ones on the left-side
1620 of the tree were fetched.
1621
1622 The prioritization is very important here for nested .isolated files.
1623 'includes' have the highest priority and the algorithm is optimized for both
1624 deep and wide trees. A deep one is a long link of .isolated files referenced
1625 one at a time by one item in 'includes'. A wide one has a large number of
1626 'includes' in a single .isolated file. 'left' is defined as an included
1627 .isolated file earlier in the 'includes' list. So the order of the elements
1628 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001629
1630 As a side effect this method starts asynchronous fetch of all data files
1631 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1632 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001633 """
1634 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1635
1636 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1637 pending = {}
1638 # Set of hashes of already retrieved items to refuse recursive includes.
1639 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001640 # Set of IsolatedFile's whose data files have already being fetched.
1641 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001642
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001643 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001644 h = isolated_file.obj_hash
1645 if h in seen:
1646 raise isolated_format.IsolatedError(
1647 'IsolatedFile %s is retrieved recursively' % h)
1648 assert h not in pending
1649 seen.add(h)
1650 pending[h] = isolated_file
1651 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1652
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653 # Start fetching root *.isolated file (single file, not the whole bundle).
1654 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001655
1656 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001657 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001658 item_hash = fetch_queue.wait(pending)
1659 item = pending.pop(item_hash)
1660 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001661
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001662 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001663 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001664 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001665
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001666 # Always fetch *.isolated files in traversal order, waiting if necessary
1667 # until next to-be-processed node loads. "Waiting" is done by yielding
1668 # back to the outer loop, that waits until some *.isolated is loaded.
1669 for node in isolated_format.walk_includes(self.root):
1670 if node not in processed:
1671 # Not visited, and not yet loaded -> wait for it to load.
1672 if not node.is_loaded:
1673 break
1674 # Not visited and loaded -> process it and continue the traversal.
1675 self._start_fetching_files(node, fetch_queue)
1676 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001677
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001678 # All *.isolated files should be processed by now and only them.
1679 all_isolateds = set(isolated_format.walk_includes(self.root))
1680 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001681
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001682 # Extract 'command' and other bundle properties.
1683 for node in isolated_format.walk_includes(self.root):
1684 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001685 self.relative_cwd = self.relative_cwd or ''
1686
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001687 def _start_fetching_files(self, isolated, fetch_queue):
1688 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001689
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001690 Modifies self.files.
1691 """
1692 logging.debug('fetch_files(%s)', isolated.obj_hash)
1693 for filepath, properties in isolated.data.get('files', {}).iteritems():
1694 # Root isolated has priority on the files being mapped. In particular,
1695 # overridden files must not be fetched.
1696 if filepath not in self.files:
1697 self.files[filepath] = properties
1698 if 'h' in properties:
1699 # Preemptively request files.
1700 logging.debug('fetching %s', filepath)
1701 fetch_queue.add(
1702 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1703
1704 def _update_self(self, node):
1705 """Extracts bundle global parameters from loaded *.isolated file.
1706
1707 Will be called with each loaded *.isolated file in order of traversal of
1708 isolated include graph (see isolated_format.walk_includes).
1709 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001710 # Grabs properties.
1711 if not self.command and node.data.get('command'):
1712 # Ensure paths are correctly separated on windows.
1713 self.command = node.data['command']
1714 if self.command:
1715 self.command[0] = self.command[0].replace('/', os.path.sep)
1716 self.command = tools.fix_python_path(self.command)
1717 if self.read_only is None and node.data.get('read_only') is not None:
1718 self.read_only = node.data['read_only']
1719 if (self.relative_cwd is None and
1720 node.data.get('relative_cwd') is not None):
1721 self.relative_cwd = node.data['relative_cwd']
1722
1723
Vadim Shtayura8623c272014-12-01 11:45:27 -08001724def set_storage_api_class(cls):
1725 """Replaces StorageApi implementation used by default."""
1726 global _storage_api_cls
1727 assert _storage_api_cls is None
1728 assert issubclass(cls, StorageApi)
1729 _storage_api_cls = cls
1730
1731
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001732def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001733 """Returns an object that implements low-level StorageApi interface.
1734
1735 It is used by Storage to work with single isolate |namespace|. It should
1736 rarely be used directly by clients, see 'get_storage' for
1737 a better alternative.
1738
1739 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001740 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001741 namespace: isolate namespace to operate in, also defines hashing and
1742 compression scheme used, i.e. namespace names that end with '-gzip'
1743 store compressed data.
1744
1745 Returns:
1746 Instance of StorageApi subclass.
1747 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001748 cls = _storage_api_cls or IsolateServer
1749 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001750
1751
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001752def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001753 """Returns Storage class that can upload and download from |namespace|.
1754
1755 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001756 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001757 namespace: isolate namespace to operate in, also defines hashing and
1758 compression scheme used, i.e. namespace names that end with '-gzip'
1759 store compressed data.
1760
1761 Returns:
1762 Instance of Storage.
1763 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001764 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001765
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001766
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001767def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001768 """Uploads the given tree to the given url.
1769
1770 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001771 base_url: The url of the isolate server to upload to.
1772 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001773 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001774 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001775 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001776 # Filter out symlinks, since they are not represented by items on isolate
1777 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001778 items = []
1779 seen = set()
1780 skipped = 0
1781 for filepath, metadata in infiles:
1782 if 'l' not in metadata and filepath not in seen:
1783 seen.add(filepath)
1784 item = FileItem(
1785 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001786 digest=metadata['h'],
1787 size=metadata['s'],
1788 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001789 items.append(item)
1790 else:
1791 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001792
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001793 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001794 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001795 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001796
1797
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001798def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001799 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001800
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001801 Arguments:
1802 isolated_hash: hash of the root *.isolated file.
1803 storage: Storage class that communicates with isolate storage.
1804 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001805 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001806 require_command: Ensure *.isolated specifies a command to run.
1807
1808 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001809 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001810 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001811 logging.debug(
1812 'fetch_isolated(%s, %s, %s, %s, %s)',
1813 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001814 # Hash algorithm to use, defined by namespace |storage| is using.
1815 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 with cache:
1817 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001818 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001819
1820 with tools.Profiler('GetIsolateds'):
1821 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001822 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001823 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1824 try:
1825 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1826 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001827 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001828 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1829 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001830
1831 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001832 bundle.fetch(fetch_queue, isolated_hash, algo)
1833 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001834 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1835 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001836 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001837
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001838 with tools.Profiler('GetRest'):
1839 # Create file system hierarchy.
1840 if not os.path.isdir(outdir):
1841 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001842 create_directories(outdir, bundle.files)
1843 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001844
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001845 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001846 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001847 if not os.path.isdir(cwd):
1848 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001849
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001850 # Multimap: digest -> list of pairs (path, props).
1851 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001852 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001853 if 'h' in props:
1854 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001855
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001856 # Now block on the remaining files to be downloaded and mapped.
1857 logging.info('Retrieving remaining files (%d of them)...',
1858 fetch_queue.pending_count)
1859 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001860 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001861 while remaining:
1862 detector.ping()
1863
1864 # Wait for any item to finish fetching to cache.
1865 digest = fetch_queue.wait(remaining)
1866
1867 # Link corresponding files to a fetched item in cache.
1868 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001869 cache.hardlink(
1870 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001871
1872 # Report progress.
1873 duration = time.time() - last_update
1874 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1875 msg = '%d files remaining...' % len(remaining)
1876 print msg
1877 logging.info(msg)
1878 last_update = time.time()
1879
1880 # Cache could evict some items we just tried to fetch, it's a fatal error.
1881 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001882 raise isolated_format.MappingError(
1883 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001884 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001885
1886
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001887def directory_to_metadata(root, algo, blacklist):
1888 """Returns the FileItem list and .isolated metadata for a directory."""
1889 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001890 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001891 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001892 metadata = {
1893 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001894 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001895 for relpath in paths
1896 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001897 for v in metadata.itervalues():
1898 v.pop('t')
1899 items = [
1900 FileItem(
1901 path=os.path.join(root, relpath),
1902 digest=meta['h'],
1903 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001904 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001905 for relpath, meta in metadata.iteritems() if 'h' in meta
1906 ]
1907 return items, metadata
1908
1909
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001910def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001911 """Stores every entries and returns the relevant data.
1912
1913 Arguments:
1914 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001915 files: list of file paths to upload. If a directory is specified, a
1916 .isolated file is created and its hash is returned.
1917 blacklist: function that returns True if a file should be omitted.
1918 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001919 assert all(isinstance(i, unicode) for i in files), files
1920 if len(files) != len(set(map(os.path.abspath, files))):
1921 raise Error('Duplicate entries found.')
1922
1923 results = []
1924 # The temporary directory is only created as needed.
1925 tempdir = None
1926 try:
1927 # TODO(maruel): Yield the files to a worker thread.
1928 items_to_upload = []
1929 for f in files:
1930 try:
1931 filepath = os.path.abspath(f)
1932 if os.path.isdir(filepath):
1933 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001934 items, metadata = directory_to_metadata(
1935 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001936
1937 # Create the .isolated file.
1938 if not tempdir:
1939 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1940 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1941 os.close(handle)
1942 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001943 'algo':
1944 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001945 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001946 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001947 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001948 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001949 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001950 items_to_upload.extend(items)
1951 items_to_upload.append(
1952 FileItem(
1953 path=isolated,
1954 digest=h,
1955 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001956 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001957 results.append((h, f))
1958
1959 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001960 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001961 items_to_upload.append(
1962 FileItem(
1963 path=filepath,
1964 digest=h,
1965 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001966 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001967 results.append((h, f))
1968 else:
1969 raise Error('%s is neither a file or directory.' % f)
1970 except OSError:
1971 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001972 # Technically we would care about which files were uploaded but we don't
1973 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001974 _uploaded_files = storage.upload_items(items_to_upload)
1975 return results
1976 finally:
1977 if tempdir:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001978 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001979
1980
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001981def archive(out, namespace, files, blacklist):
1982 if files == ['-']:
1983 files = sys.stdin.readlines()
1984
1985 if not files:
1986 raise Error('Nothing to upload')
1987
1988 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001989 blacklist = tools.gen_blacklist(blacklist)
1990 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001991 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001992 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1993
1994
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001995@subcommand.usage('<file1..fileN> or - to read from stdin')
1996def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001997 """Archives data to the server.
1998
1999 If a directory is specified, a .isolated file is created the whole directory
2000 is uploaded. Then this .isolated file can be included in another one to run
2001 commands.
2002
2003 The commands output each file that was processed with its content hash. For
2004 directories, the .isolated generated for the directory is listed as the
2005 directory entry itself.
2006 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002007 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002008 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002009 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002010 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002011 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002012 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002013 except Error as e:
2014 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002015 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002016
2017
2018def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002019 """Download data from the server.
2020
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002021 It can either download individual files or a complete tree from a .isolated
2022 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002023 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002024 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002025 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002026 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002027 help='hash of an isolated file, .isolated file content is discarded, use '
2028 '--file if you need it')
2029 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002030 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2031 help='hash and destination of a file, can be used multiple times')
2032 parser.add_option(
2033 '-t', '--target', metavar='DIR', default=os.getcwd(),
2034 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002035 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002036 options, args = parser.parse_args(args)
2037 if args:
2038 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002039
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002040 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002041 if bool(options.isolated) == bool(options.file):
2042 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002043
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002044 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002045 options.target = os.path.abspath(options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002046 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002047 # Fetching individual files.
2048 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002049 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002050 channel = threading_utils.TaskChannel()
2051 pending = {}
2052 for digest, dest in options.file:
2053 pending[digest] = dest
2054 storage.async_fetch(
2055 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002056 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002057 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002058 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002059 functools.partial(file_write, os.path.join(options.target, dest)))
2060 while pending:
2061 fetched = channel.pull()
2062 dest = pending.pop(fetched)
2063 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002064
Vadim Shtayura3172be52013-12-03 12:49:05 -08002065 # Fetching whole isolated tree.
2066 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002067 with cache:
2068 bundle = fetch_isolated(
2069 isolated_hash=options.isolated,
2070 storage=storage,
2071 cache=cache,
2072 outdir=options.target,
2073 require_command=False)
2074 if bundle.command:
2075 rel = os.path.join(options.target, bundle.relative_cwd)
2076 print('To run this test please run from the directory %s:' %
2077 os.path.join(options.target, rel))
2078 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002079
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002080 return 0
2081
2082
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002083def add_archive_options(parser):
2084 parser.add_option(
2085 '--blacklist',
2086 action='append', default=list(DEFAULT_BLACKLIST),
2087 help='List of regexp to use as blacklist filter when uploading '
2088 'directories')
2089
2090
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002091def add_isolate_server_options(parser):
2092 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002093 parser.add_option(
2094 '-I', '--isolate-server',
2095 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002096 help='URL of the Isolate Server to use. Defaults to the environment '
2097 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2098 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002099 parser.add_option(
2100 '--namespace', default='default-gzip',
2101 help='The namespace to use on the Isolate Server, default: %default')
2102
2103
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002104def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002105 """Processes the --isolate-server option and aborts if not specified.
2106
2107 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002108 """
2109 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002110 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002111 try:
2112 options.isolate_server = net.fix_url(options.isolate_server)
2113 except ValueError as e:
2114 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002115 if set_exception_handler:
2116 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002117 try:
2118 return auth.ensure_logged_in(options.isolate_server)
2119 except ValueError as e:
2120 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002121
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002122
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002123def add_cache_options(parser):
2124 cache_group = optparse.OptionGroup(parser, 'Cache management')
2125 cache_group.add_option(
2126 '--cache', metavar='DIR',
2127 help='Directory to keep a local cache of the files. Accelerates download '
2128 'by reusing already downloaded files. Default=%default')
2129 cache_group.add_option(
2130 '--max-cache-size',
2131 type='int',
2132 metavar='NNN',
2133 default=20*1024*1024*1024,
2134 help='Trim if the cache gets larger than this value, default=%default')
2135 cache_group.add_option(
2136 '--min-free-space',
2137 type='int',
2138 metavar='NNN',
2139 default=2*1024*1024*1024,
2140 help='Trim if disk free space becomes lower than this value, '
2141 'default=%default')
2142 cache_group.add_option(
2143 '--max-items',
2144 type='int',
2145 metavar='NNN',
2146 default=100000,
2147 help='Trim if more than this number of items are in the cache '
2148 'default=%default')
2149 parser.add_option_group(cache_group)
2150
2151
2152def process_cache_options(options):
2153 if options.cache:
2154 policies = CachePolicies(
2155 options.max_cache_size, options.min_free_space, options.max_items)
2156
2157 # |options.cache| path may not exist until DiskCache() instance is created.
2158 return DiskCache(
2159 os.path.abspath(options.cache),
2160 policies,
2161 isolated_format.get_hash_algo(options.namespace))
2162 else:
2163 return MemoryCache()
2164
2165
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002166class OptionParserIsolateServer(tools.OptionParserWithLogging):
2167 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002168 tools.OptionParserWithLogging.__init__(
2169 self,
2170 version=__version__,
2171 prog=os.path.basename(sys.modules[__name__].__file__),
2172 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002173 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002174
2175 def parse_args(self, *args, **kwargs):
2176 options, args = tools.OptionParserWithLogging.parse_args(
2177 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002178 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002179 return options, args
2180
2181
2182def main(args):
2183 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002184 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002185
2186
2187if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002188 fix_encoding.fix_encoding()
2189 tools.disable_buffering()
2190 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002191 sys.exit(main(sys.argv[1:]))