blob: 623ac233b4863ee2395ee8522b4150e173bbf2b0 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04008__version__ = '0.3.4'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000010import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000011import logging
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000013import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040014import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050016import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import time
maruel@chromium.orge82112e2013-04-24 14:41:55 +000019import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050020import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000021import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000023from third_party import colorama
24from third_party.depot_tools import fix_encoding
25from third_party.depot_tools import subcommand
26
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050027from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040028from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000029from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040030from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000031from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000032from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000033
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080034import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040035import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080036
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000038# Version of isolate protocol passed to the server in /handshake request.
39ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000040
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000041
Vadim Shtayura3148e072014-09-02 18:51:52 -070042# The file size to be used when we don't know the correct file size,
43# generally used for .isolated files.
44UNKNOWN_FILE_SIZE = None
45
46
47# Maximum expected delay (in seconds) between successive file fetches or uploads
48# in Storage. If it takes longer than that, a deadlock might be happening
49# and all stack frames for all threads are dumped to log.
50DEADLOCK_TIMEOUT = 5 * 60
51
52
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000054# All files are sorted by likelihood of a change in the file content
55# (currently file size is used to estimate this: larger the file -> larger the
56# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000057# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000058# and so on. Numbers here is a trade-off; the more per request, the lower the
59# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
60# larger values cause longer lookups, increasing the initial latency to start
61# uploading, which is especially an issue for large files. This value is
62# optimized for the "few thousands files to look up with minimal number of large
63# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040064ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000065
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000066
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000067# A list of already compressed extension types that should not receive any
68# compression before being uploaded.
69ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040070 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
71 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072]
73
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000074
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000075# Chunk size to use when reading from network stream.
76NET_IO_FILE_CHUNK = 16 * 1024
77
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000078
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000079# Read timeout in seconds for downloads from isolate storage. If there's no
80# response from the server within this timeout whole download will be aborted.
81DOWNLOAD_READ_TIMEOUT = 60
82
83
maruel@chromium.org41601642013-09-18 19:40:46 +000084# The delay (in seconds) to wait between logging statements when retrieving
85# the required files. This is intended to let the user (or buildbot) know that
86# the program is still running.
87DELAY_BETWEEN_UPDATES_IN_SECS = 30
88
89
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050090DEFAULT_BLACKLIST = (
91 # Temporary vim or python files.
92 r'^.+\.(?:pyc|swp)$',
93 # .git or .svn directory.
94 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
95)
96
97
98# Chromium-specific.
99DEFAULT_BLACKLIST += (
100 r'^.+\.(?:run_test_cases)$',
101 r'^(?:.+' + re.escape(os.path.sep) + r'|)testserver\.log$',
102)
103
104
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500105class Error(Exception):
106 """Generic runtime error."""
107 pass
108
109
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400110class Aborted(Error):
111 """Operation aborted."""
112 pass
113
114
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000115def stream_read(stream, chunk_size):
116 """Reads chunks from |stream| and yields them."""
117 while True:
118 data = stream.read(chunk_size)
119 if not data:
120 break
121 yield data
122
123
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400124def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800125 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800127 if offset:
128 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000130 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000131 if not data:
132 break
133 yield data
134
135
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136def file_write(filepath, content_generator):
137 """Writes file content as generated by content_generator.
138
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 Creates the intermediary directory as needed.
140
141 Returns the number of bytes written.
142
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 Meant to be mocked out in unit tests.
144 """
145 filedir = os.path.dirname(filepath)
146 if not os.path.isdir(filedir):
147 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000148 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000149 with open(filepath, 'wb') as f:
150 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000151 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000152 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000153 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000154
155
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000156def zip_compress(content_generator, level=7):
157 """Reads chunks from |content_generator| and yields zip compressed chunks."""
158 compressor = zlib.compressobj(level)
159 for chunk in content_generator:
160 compressed = compressor.compress(chunk)
161 if compressed:
162 yield compressed
163 tail = compressor.flush(zlib.Z_FINISH)
164 if tail:
165 yield tail
166
167
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400168def zip_decompress(
169 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000170 """Reads zipped data from |content_generator| and yields decompressed data.
171
172 Decompresses data in small chunks (no larger than |chunk_size|) so that
173 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
174
175 Raises IOError if data is corrupted or incomplete.
176 """
177 decompressor = zlib.decompressobj()
178 compressed_size = 0
179 try:
180 for chunk in content_generator:
181 compressed_size += len(chunk)
182 data = decompressor.decompress(chunk, chunk_size)
183 if data:
184 yield data
185 while decompressor.unconsumed_tail:
186 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
187 if data:
188 yield data
189 tail = decompressor.flush()
190 if tail:
191 yield tail
192 except zlib.error as e:
193 raise IOError(
194 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
195 # Ensure all data was read and decompressed.
196 if decompressor.unused_data or decompressor.unconsumed_tail:
197 raise IOError('Not all data was decompressed')
198
199
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000200def get_zip_compression_level(filename):
201 """Given a filename calculates the ideal zip compression level to use."""
202 file_ext = os.path.splitext(filename)[1].lower()
203 # TODO(csharp): Profile to find what compression level works best.
204 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
205
206
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000207def create_directories(base_directory, files):
208 """Creates the directory structure needed by the given list of files."""
209 logging.debug('create_directories(%s, %d)', base_directory, len(files))
210 # Creates the tree of directories to create.
211 directories = set(os.path.dirname(f) for f in files)
212 for item in list(directories):
213 while item:
214 directories.add(item)
215 item = os.path.dirname(item)
216 for d in sorted(directories):
217 if d:
218 os.mkdir(os.path.join(base_directory, d))
219
220
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221def create_symlinks(base_directory, files):
222 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000223 for filepath, properties in files:
224 if 'l' not in properties:
225 continue
226 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500227 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000228 logging.warning('Ignoring symlink %s', filepath)
229 continue
230 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500231 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000232 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233
234
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000235def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 """Determines if the given files appears valid.
237
238 Currently it just checks the file's size.
239 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700240 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000241 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000242 actual_size = os.stat(filepath).st_size
243 if size != actual_size:
244 logging.warning(
245 'Found invalid item %s; %d != %d',
246 os.path.basename(filepath), actual_size, size)
247 return False
248 return True
249
250
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251class Item(object):
252 """An item to push to Storage.
253
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800254 Its digest and size may be provided in advance, if known. Otherwise they will
255 be derived from content(). If digest is provided, it MUST correspond to
256 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000257
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800258 When used with Storage, Item starts its life in a main thread, travels
259 to 'contains' thread, then to 'push' thread and then finally back to
260 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """
262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000264 self.digest = digest
265 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800269 def content(self):
270 """Iterable with content of this item as byte string (str) chunks."""
271 raise NotImplementedError()
272
273 def prepare(self, hash_algo):
274 """Ensures self.digest and self.size are set.
275
276 Uses content() as a source of data to calculate them. Does nothing if digest
277 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000278
279 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800280 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000281 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800282 if self.digest is None or self.size is None:
283 digest = hash_algo()
284 total = 0
285 for chunk in self.content():
286 digest.update(chunk)
287 total += len(chunk)
288 self.digest = digest.hexdigest()
289 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000290
291
292class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800293 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000294
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800295 Its digest and size may be provided in advance, if known. Otherwise they will
296 be derived from the file content.
297 """
298
299 def __init__(self, path, digest=None, size=None, high_priority=False):
300 super(FileItem, self).__init__(
301 digest,
302 size if size is not None else os.stat(path).st_size,
303 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000304 self.path = path
305 self.compression_level = get_zip_compression_level(path)
306
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800307 def content(self):
308 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000309
310
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000311class BufferItem(Item):
312 """A byte buffer to push to Storage."""
313
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800314 def __init__(self, buf, high_priority=False):
315 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000316 self.buffer = buf
317
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800318 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319 return [self.buffer]
320
321
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000322class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800323 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000324
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800325 Implements compression support, parallel 'contains' checks, parallel uploads
326 and more.
327
328 Works only within single namespace (and thus hashing algorithm and compression
329 scheme are fixed).
330
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400331 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
332 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800333 """
334
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700335 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400337 self._use_zip = isolated_format.is_namespace_with_compression(
338 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400339 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000340 self._cpu_thread_pool = None
341 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400342 self._aborted = False
343 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000344
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000345 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700346 def hash_algo(self):
347 """Hashing algorithm used to name files in storage based on their content.
348
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400349 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700350 """
351 return self._hash_algo
352
353 @property
354 def location(self):
355 """Location of a backing store that this class is using.
356
357 Exact meaning depends on the storage_api type. For IsolateServer it is
358 an URL of isolate server, for FileSystem is it a path in file system.
359 """
360 return self._storage_api.location
361
362 @property
363 def namespace(self):
364 """Isolate namespace used by this storage.
365
366 Indirectly defines hashing scheme and compression method used.
367 """
368 return self._storage_api.namespace
369
370 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000371 def cpu_thread_pool(self):
372 """ThreadPool for CPU-bound tasks like zipping."""
373 if self._cpu_thread_pool is None:
374 self._cpu_thread_pool = threading_utils.ThreadPool(
375 2, max(threading_utils.num_processors(), 2), 0, 'zip')
376 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000377
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 @property
379 def net_thread_pool(self):
380 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
381 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700382 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000384
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 def close(self):
386 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400387 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 if self._cpu_thread_pool:
389 self._cpu_thread_pool.join()
390 self._cpu_thread_pool.close()
391 self._cpu_thread_pool = None
392 if self._net_thread_pool:
393 self._net_thread_pool.join()
394 self._net_thread_pool.close()
395 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400396 logging.info('Done.')
397
398 def abort(self):
399 """Cancels any pending or future operations."""
400 # This is not strictly theadsafe, but in the worst case the logging message
401 # will be printed twice. Not a big deal. In other places it is assumed that
402 # unprotected reads and writes to _aborted are serializable (it is true
403 # for python) and thus no locking is used.
404 if not self._aborted:
405 logging.warning('Aborting... It can take a while.')
406 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000407
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000408 def __enter__(self):
409 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400410 assert not self._prev_sig_handlers, self._prev_sig_handlers
411 for s in (signal.SIGINT, signal.SIGTERM):
412 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000413 return self
414
415 def __exit__(self, _exc_type, _exc_value, _traceback):
416 """Context manager interface."""
417 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400418 while self._prev_sig_handlers:
419 s, h = self._prev_sig_handlers.popitem()
420 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000421 return False
422
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000423 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800424 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000425
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800426 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000427
428 Arguments:
429 items: list of Item instances that represents data to upload.
430
431 Returns:
432 List of items that were uploaded. All other items are already there.
433 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700434 logging.info('upload_items(items=%d)', len(items))
435
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800436 # Ensure all digests are calculated.
437 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700438 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800439
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000440 # For each digest keep only first Item that matches it. All other items
441 # are just indistinguishable copies from the point of view of isolate
442 # server (it doesn't care about paths at all, only content and digests).
443 seen = {}
444 duplicates = 0
445 for item in items:
446 if seen.setdefault(item.digest, item) is not item:
447 duplicates += 1
448 items = seen.values()
449 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700450 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000453 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000454 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800455 channel = threading_utils.TaskChannel()
456 for missing_item, push_state in self.get_missing_items(items):
457 missing.add(missing_item)
458 self.async_push(channel, missing_item, push_state)
459
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000460 # No need to spawn deadlock detector thread if there's nothing to upload.
461 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700462 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000464 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 detector.ping()
466 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000467 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000468 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000469 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000470 logging.info('All files are uploaded')
471
472 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000473 total = len(items)
474 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000475 logging.info(
476 'Total: %6d, %9.1fkb',
477 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000478 total_size / 1024.)
479 cache_hit = set(items) - missing
480 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 logging.info(
482 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
483 len(cache_hit),
484 cache_hit_size / 1024.,
485 len(cache_hit) * 100. / total,
486 cache_hit_size * 100. / total_size if total_size else 0)
487 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000488 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 logging.info(
490 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
491 len(cache_miss),
492 cache_miss_size / 1024.,
493 len(cache_miss) * 100. / total,
494 cache_miss_size * 100. / total_size if total_size else 0)
495
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496 return uploaded
497
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 def get_fetch_url(self, item):
499 """Returns an URL that can be used to fetch given item once it's uploaded.
500
501 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000502
503 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000505
506 Returns:
507 An URL or None if underlying protocol doesn't support this.
508 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700509 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000511
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000513 """Starts asynchronous push to the server in a parallel thread.
514
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 Can be used only after |item| was checked for presence on a server with
516 'get_missing_items' call. 'get_missing_items' returns |push_state| object
517 that contains storage specific information describing how to upload
518 the item (for example in case of cloud storage, it is signed upload URLs).
519
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000520 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000521 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000522 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800523 push_state: push state returned by 'get_missing_items' call for |item|.
524
525 Returns:
526 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400529 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700530 threading_utils.PRIORITY_HIGH if item.high_priority
531 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800532
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000533 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400534 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400535 if self._aborted:
536 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800538 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 return item
540
541 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700542 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800543 self.net_thread_pool.add_task_with_channel(
544 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000545 return
546
547 # If zipping is enabled, zip in a separate thread.
548 def zip_and_push():
549 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
550 # content right here. It will block until all file is zipped.
551 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400552 if self._aborted:
553 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800554 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 data = ''.join(stream)
556 except Exception as exc:
557 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800558 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000560 self.net_thread_pool.add_task_with_channel(
561 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000563
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800564 def push(self, item, push_state):
565 """Synchronously pushes a single item to the server.
566
567 If you need to push many items at once, consider using 'upload_items' or
568 'async_push' with instance of TaskChannel.
569
570 Arguments:
571 item: item to upload as instance of Item class.
572 push_state: push state returned by 'get_missing_items' call for |item|.
573
574 Returns:
575 Pushed item (same object as |item|).
576 """
577 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700578 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800579 self.async_push(channel, item, push_state)
580 pushed = channel.pull()
581 assert pushed is item
582 return item
583
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000584 def async_fetch(self, channel, priority, digest, size, sink):
585 """Starts asynchronous fetch from the server in a parallel thread.
586
587 Arguments:
588 channel: TaskChannel that receives back |digest| when download ends.
589 priority: thread pool task priority for the fetch.
590 digest: hex digest of an item to download.
591 size: expected size of the item (after decompression).
592 sink: function that will be called as sink(generator).
593 """
594 def fetch():
595 try:
596 # Prepare reading pipeline.
597 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700598 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400599 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000600 # Run |stream| through verifier that will assert its size.
601 verifier = FetchStreamVerifier(stream, size)
602 # Verified stream goes to |sink|.
603 sink(verifier.run())
604 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800605 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000606 raise
607 return digest
608
609 # Don't bother with zip_thread_pool for decompression. Decompression is
610 # really fast and most probably IO bound anyway.
611 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
612
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000613 def get_missing_items(self, items):
614 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000615
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000616 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000617
618 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000619 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
621 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800622 For each missing item it yields a pair (item, push_state), where:
623 * item - Item object that is missing (one of |items|).
624 * push_state - opaque object that contains storage specific information
625 describing how to upload the item (for example in case of cloud
626 storage, it is signed upload URLs). It can later be passed to
627 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000629 channel = threading_utils.TaskChannel()
630 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
632 # Ensure all digests are calculated.
633 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700634 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400636 def contains(batch):
637 if self._aborted:
638 raise Aborted()
639 return self._storage_api.contains(batch)
640
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400643 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400644 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000645 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 # Yield results as they come in.
648 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649 for missing_item, push_state in channel.pull().iteritems():
650 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653def batch_items_for_check(items):
654 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000655
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800656 Each batch corresponds to a single 'exists?' query to the server via a call
657 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000658
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800659 Arguments:
660 items: a list of Item objects.
661
662 Yields:
663 Batches of items to query for existence in a single operation,
664 each batch is a list of Item objects.
665 """
666 batch_count = 0
667 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
668 next_queries = []
669 for item in sorted(items, key=lambda x: x.size, reverse=True):
670 next_queries.append(item)
671 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000672 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800673 next_queries = []
674 batch_count += 1
675 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
676 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
677 if next_queries:
678 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000679
680
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000681class FetchQueue(object):
682 """Fetches items from Storage and places them into LocalCache.
683
684 It manages multiple concurrent fetch operations. Acts as a bridge between
685 Storage and LocalCache so that Storage and LocalCache don't depend on each
686 other at all.
687 """
688
689 def __init__(self, storage, cache):
690 self.storage = storage
691 self.cache = cache
692 self._channel = threading_utils.TaskChannel()
693 self._pending = set()
694 self._accessed = set()
695 self._fetched = cache.cached_set()
696
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400697 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700698 self,
699 digest,
700 size=UNKNOWN_FILE_SIZE,
701 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000702 """Starts asynchronous fetch of item |digest|."""
703 # Fetching it now?
704 if digest in self._pending:
705 return
706
707 # Mark this file as in use, verify_all_cached will later ensure it is still
708 # in cache.
709 self._accessed.add(digest)
710
711 # Already fetched? Notify cache to update item's LRU position.
712 if digest in self._fetched:
713 # 'touch' returns True if item is in cache and not corrupted.
714 if self.cache.touch(digest, size):
715 return
716 # Item is corrupted, remove it from cache and fetch it again.
717 self._fetched.remove(digest)
718 self.cache.evict(digest)
719
720 # TODO(maruel): It should look at the free disk space, the current cache
721 # size and the size of the new item on every new item:
722 # - Trim the cache as more entries are listed when free disk space is low,
723 # otherwise if the amount of data downloaded during the run > free disk
724 # space, it'll crash.
725 # - Make sure there's enough free disk space to fit all dependencies of
726 # this run! If not, abort early.
727
728 # Start fetching.
729 self._pending.add(digest)
730 self.storage.async_fetch(
731 self._channel, priority, digest, size,
732 functools.partial(self.cache.write, digest))
733
734 def wait(self, digests):
735 """Starts a loop that waits for at least one of |digests| to be retrieved.
736
737 Returns the first digest retrieved.
738 """
739 # Flush any already fetched items.
740 for digest in digests:
741 if digest in self._fetched:
742 return digest
743
744 # Ensure all requested items are being fetched now.
745 assert all(digest in self._pending for digest in digests), (
746 digests, self._pending)
747
748 # Wait for some requested item to finish fetching.
749 while self._pending:
750 digest = self._channel.pull()
751 self._pending.remove(digest)
752 self._fetched.add(digest)
753 if digest in digests:
754 return digest
755
756 # Should never reach this point due to assert above.
757 raise RuntimeError('Impossible state')
758
759 def inject_local_file(self, path, algo):
760 """Adds local file to the cache as if it was fetched from storage."""
761 with open(path, 'rb') as f:
762 data = f.read()
763 digest = algo(data).hexdigest()
764 self.cache.write(digest, [data])
765 self._fetched.add(digest)
766 return digest
767
768 @property
769 def pending_count(self):
770 """Returns number of items to be fetched."""
771 return len(self._pending)
772
773 def verify_all_cached(self):
774 """True if all accessed items are in cache."""
775 return self._accessed.issubset(self.cache.cached_set())
776
777
778class FetchStreamVerifier(object):
779 """Verifies that fetched file is valid before passing it to the LocalCache."""
780
781 def __init__(self, stream, expected_size):
782 self.stream = stream
783 self.expected_size = expected_size
784 self.current_size = 0
785
786 def run(self):
787 """Generator that yields same items as |stream|.
788
789 Verifies |stream| is complete before yielding a last chunk to consumer.
790
791 Also wraps IOError produced by consumer into MappingError exceptions since
792 otherwise Storage will retry fetch on unrelated local cache errors.
793 """
794 # Read one chunk ahead, keep it in |stored|.
795 # That way a complete stream can be verified before pushing last chunk
796 # to consumer.
797 stored = None
798 for chunk in self.stream:
799 assert chunk is not None
800 if stored is not None:
801 self._inspect_chunk(stored, is_last=False)
802 try:
803 yield stored
804 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400805 raise isolated_format.MappingError(
806 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000807 stored = chunk
808 if stored is not None:
809 self._inspect_chunk(stored, is_last=True)
810 try:
811 yield stored
812 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400813 raise isolated_format.MappingError(
814 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000815
816 def _inspect_chunk(self, chunk, is_last):
817 """Called for each fetched chunk before passing it to consumer."""
818 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400819 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700820 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000821 (self.expected_size != self.current_size)):
822 raise IOError('Incorrect file size: expected %d, got %d' % (
823 self.expected_size, self.current_size))
824
825
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000826class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800827 """Interface for classes that implement low-level storage operations.
828
829 StorageApi is oblivious of compression and hashing scheme used. This details
830 are handled in higher level Storage class.
831
832 Clients should generally not use StorageApi directly. Storage class is
833 preferred since it implements compression and upload optimizations.
834 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000835
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700836 @property
837 def location(self):
838 """Location of a backing store that this class is using.
839
840 Exact meaning depends on the type. For IsolateServer it is an URL of isolate
841 server, for FileSystem is it a path in file system.
842 """
843 raise NotImplementedError()
844
845 @property
846 def namespace(self):
847 """Isolate namespace used by this storage.
848
849 Indirectly defines hashing scheme and compression method used.
850 """
851 raise NotImplementedError()
852
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000853 def get_fetch_url(self, digest):
854 """Returns an URL that can be used to fetch an item with given digest.
855
856 Arguments:
857 digest: hex digest of item to fetch.
858
859 Returns:
860 An URL or None if the protocol doesn't support this.
861 """
862 raise NotImplementedError()
863
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800864 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000865 """Fetches an object and yields its content.
866
867 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000868 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800869 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000870
871 Yields:
872 Chunks of downloaded item (as str objects).
873 """
874 raise NotImplementedError()
875
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800876 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000877 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000878
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800879 |item| MUST go through 'contains' call to get |push_state| before it can
880 be pushed to the storage.
881
882 To be clear, here is one possible usage:
883 all_items = [... all items to push as Item subclasses ...]
884 for missing_item, push_state in storage_api.contains(all_items).items():
885 storage_api.push(missing_item, push_state)
886
887 When pushing to a namespace with compression, data that should be pushed
888 and data provided by the item is not the same. In that case |content| is
889 not None and it yields chunks of compressed data (using item.content() as
890 a source of original uncompressed data). This is implemented by Storage
891 class.
892
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000893 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000894 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800895 push_state: push state object as returned by 'contains' call.
896 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000897
898 Returns:
899 None.
900 """
901 raise NotImplementedError()
902
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000903 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800904 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000905
906 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800907 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000908
909 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800910 A dict missing Item -> opaque push state object to be passed to 'push'.
911 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000912 """
913 raise NotImplementedError()
914
915
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800916class _IsolateServerPushState(object):
917 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500918
919 Note this needs to be a global class to support pickling.
920 """
921
922 def __init__(self, upload_url, finalize_url):
923 self.upload_url = upload_url
924 self.finalize_url = finalize_url
925 self.uploaded = False
926 self.finalized = False
927
928
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000929class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000930 """StorageApi implementation that downloads and uploads to Isolate Server.
931
932 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800933 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000934 """
935
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000936 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000937 super(IsolateServer, self).__init__()
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000938 assert base_url.startswith('http'), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700939 self._base_url = base_url.rstrip('/')
940 self._namespace = namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000941 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000942 self._server_caps = None
943
944 @staticmethod
945 def _generate_handshake_request():
946 """Returns a dict to be sent as handshake request body."""
947 # TODO(vadimsh): Set 'pusher' and 'fetcher' according to intended usage.
948 return {
949 'client_app_version': __version__,
950 'fetcher': True,
951 'protocol_version': ISOLATE_PROTOCOL_VERSION,
952 'pusher': True,
953 }
954
955 @staticmethod
956 def _validate_handshake_response(caps):
957 """Validates and normalizes handshake response."""
958 logging.info('Protocol version: %s', caps['protocol_version'])
959 logging.info('Server version: %s', caps['server_app_version'])
960 if caps.get('error'):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400961 raise isolated_format.MappingError(caps['error'])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000962 if not caps['access_token']:
963 raise ValueError('access_token is missing')
964 return caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000965
966 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000967 def _server_capabilities(self):
968 """Performs handshake with the server if not yet done.
969
970 Returns:
971 Server capabilities dictionary as returned by /handshake endpoint.
972
973 Raises:
974 MappingError if server rejects the handshake.
975 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000976 # TODO(maruel): Make this request much earlier asynchronously while the
977 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800978
979 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
980 # namespace-level ACLs to this call.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000981 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000982 if self._server_caps is None:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000983 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -0400984 caps = net.url_read_json(
985 url=self._base_url + '/content-gs/handshake',
986 data=self._generate_handshake_request())
987 if caps is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400988 raise isolated_format.MappingError('Failed to perform handshake.')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000989 if not isinstance(caps, dict):
990 raise ValueError('Expecting JSON dict')
991 self._server_caps = self._validate_handshake_response(caps)
992 except (ValueError, KeyError, TypeError) as exc:
993 # KeyError exception has very confusing str conversion: it's just a
994 # missing key value and nothing else. So print exception class name
995 # as well.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400996 raise isolated_format.MappingError(
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400997 'Invalid handshake response (%s): %s' % (
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000998 exc.__class__.__name__, exc))
999 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001000
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001001 @property
1002 def location(self):
1003 return self._base_url
1004
1005 @property
1006 def namespace(self):
1007 return self._namespace
1008
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001009 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001010 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001011 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001012 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001013
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001014 def fetch(self, digest, offset=0):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001015 source_url = self.get_fetch_url(digest)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001016 logging.debug('download_file(%s, %d)', source_url, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001017
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001018 connection = net.url_open(
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001019 source_url,
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001020 read_timeout=DOWNLOAD_READ_TIMEOUT,
1021 headers={'Range': 'bytes=%d-' % offset} if offset else None)
1022
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001023 if not connection:
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001024 raise IOError('Request failed - %s' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001025
1026 # If |offset| is used, verify server respects it by checking Content-Range.
1027 if offset:
1028 content_range = connection.get_header('Content-Range')
1029 if not content_range:
1030 raise IOError('Missing Content-Range header')
1031
1032 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1033 # According to a spec, <size> can be '*' meaning "Total size of the file
1034 # is not known in advance".
1035 try:
1036 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1037 if not match:
1038 raise ValueError()
1039 content_offset = int(match.group(1))
1040 last_byte_index = int(match.group(2))
1041 size = None if match.group(3) == '*' else int(match.group(3))
1042 except ValueError:
1043 raise IOError('Invalid Content-Range header: %s' % content_range)
1044
1045 # Ensure returned offset equals requested one.
1046 if offset != content_offset:
1047 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1048 offset, content_offset, content_range))
1049
1050 # Ensure entire tail of the file is returned.
1051 if size is not None and last_byte_index + 1 != size:
1052 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1053
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001054 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001055
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001056 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001057 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001058 assert item.digest is not None
1059 assert item.size is not None
1060 assert isinstance(push_state, _IsolateServerPushState)
1061 assert not push_state.finalized
1062
1063 # Default to item.content().
1064 content = item.content() if content is None else content
1065
1066 # Do not iterate byte by byte over 'str'. Push it all as a single chunk.
1067 if isinstance(content, basestring):
1068 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1069 content = [content]
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001070
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001071 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1072 # If |content| is indeed a generator, it can not be re-winded back
1073 # to the beginning of the stream. A retry will find it exhausted. A possible
1074 # solution is to wrap |content| generator with some sort of caching
1075 # restartable generator. It should be done alongside streaming support
1076 # implementation.
1077
1078 # This push operation may be a retry after failed finalization call below,
1079 # no need to reupload contents in that case.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001080 if not push_state.uploaded:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001081 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1082 # upload support is implemented.
1083 if isinstance(content, list) and len(content) == 1:
1084 content = content[0]
1085 else:
1086 content = ''.join(content)
1087 # PUT file to |upload_url|.
1088 response = net.url_read(
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001089 url=push_state.upload_url,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001090 data=content,
1091 content_type='application/octet-stream',
1092 method='PUT')
1093 if response is None:
1094 raise IOError('Failed to upload a file %s to %s' % (
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001095 item.digest, push_state.upload_url))
1096 push_state.uploaded = True
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001097 else:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001098 logging.info(
1099 'A file %s already uploaded, retrying finalization only', item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001100
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001101 # Optionally notify the server that it's done.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001102 if push_state.finalize_url:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001103 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1104 # send it to isolated server. That way isolate server can verify that
1105 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1106 # stored files).
Marc-Antoine Ruelc1c2ccc2014-08-13 19:18:49 -04001107 # TODO(maruel): Fix the server to accept propery data={} so
1108 # url_read_json() can be used.
1109 response = net.url_read(
1110 url=push_state.finalize_url,
1111 data='',
1112 content_type='application/json',
1113 method='POST')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001114 if response is None:
1115 raise IOError('Failed to finalize an upload of %s' % item.digest)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001116 push_state.finalized = True
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001117
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001118 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001119 # Ensure all items were initialized with 'prepare' call. Storage does that.
1120 assert all(i.digest is not None and i.size is not None for i in items)
1121
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001122 # Request body is a json encoded list of dicts.
1123 body = [
1124 {
1125 'h': item.digest,
1126 's': item.size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001127 'i': int(item.high_priority),
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001128 } for item in items
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001129 ]
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001130
1131 query_url = '%s/content-gs/pre-upload/%s?token=%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001132 self._base_url,
1133 self._namespace,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001134 urllib.quote(self._server_capabilities['access_token']))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001135
1136 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001137 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001138 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001139 response = net.url_read_json(url=query_url, data=body)
1140 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001141 raise isolated_format.MappingError(
1142 'Failed to execute /pre-upload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001143 if not isinstance(response, list):
1144 raise ValueError('Expecting response with json-encoded list')
1145 if len(response) != len(items):
1146 raise ValueError(
1147 'Incorrect number of items in the list, expected %d, '
1148 'but got %d' % (len(items), len(response)))
1149 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001150 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001151 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001152
1153 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001154 missing_items = {}
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001155 for i, push_urls in enumerate(response):
1156 if push_urls:
1157 assert len(push_urls) == 2, str(push_urls)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001158 missing_items[items[i]] = _IsolateServerPushState(
1159 push_urls[0], push_urls[1])
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001160 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001161 len(items), len(items) - len(missing_items))
1162 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001163
1164
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001165class FileSystem(StorageApi):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001166 """StorageApi implementation that fetches data from the file system.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001167
1168 The common use case is a NFS/CIFS file server that is mounted locally that is
1169 used to fetch the file on a local partition.
1170 """
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001171
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001172 # Used for push_state instead of None. That way caller is forced to
1173 # call 'contains' before 'push'. Naively passing None in 'push' will not work.
1174 _DUMMY_PUSH_STATE = object()
1175
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001176 def __init__(self, base_path, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001177 super(FileSystem, self).__init__()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001178 self._base_path = base_path
1179 self._namespace = namespace
1180
1181 @property
1182 def location(self):
1183 return self._base_path
1184
1185 @property
1186 def namespace(self):
1187 return self._namespace
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001188
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +00001189 def get_fetch_url(self, digest):
1190 return None
1191
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001192 def fetch(self, digest, offset=0):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001193 assert isinstance(digest, basestring)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001194 return file_read(os.path.join(self._base_path, digest), offset=offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001195
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001196 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001197 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001198 assert item.digest is not None
1199 assert item.size is not None
1200 assert push_state is self._DUMMY_PUSH_STATE
1201 content = item.content() if content is None else content
1202 if isinstance(content, basestring):
1203 assert not isinstance(content, unicode), 'Unicode string is not allowed'
1204 content = [content]
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001205 file_write(os.path.join(self._base_path, item.digest), content)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001206
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001207 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001208 assert all(i.digest is not None and i.size is not None for i in items)
1209 return dict(
1210 (item, self._DUMMY_PUSH_STATE) for item in items
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001211 if not os.path.exists(os.path.join(self._base_path, item.digest))
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001212 )
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001213
1214
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001215class LocalCache(object):
1216 """Local cache that stores objects fetched via Storage.
1217
1218 It can be accessed concurrently from multiple threads, so it should protect
1219 its internal state with some lock.
1220 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001221 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001222
1223 def __enter__(self):
1224 """Context manager interface."""
1225 return self
1226
1227 def __exit__(self, _exc_type, _exec_value, _traceback):
1228 """Context manager interface."""
1229 return False
1230
1231 def cached_set(self):
1232 """Returns a set of all cached digests (always a new object)."""
1233 raise NotImplementedError()
1234
1235 def touch(self, digest, size):
1236 """Ensures item is not corrupted and updates its LRU position.
1237
1238 Arguments:
1239 digest: hash digest of item to check.
1240 size: expected size of this item.
1241
1242 Returns:
1243 True if item is in cache and not corrupted.
1244 """
1245 raise NotImplementedError()
1246
1247 def evict(self, digest):
1248 """Removes item from cache if it's there."""
1249 raise NotImplementedError()
1250
1251 def read(self, digest):
1252 """Returns contents of the cached item as a single str."""
1253 raise NotImplementedError()
1254
1255 def write(self, digest, content):
1256 """Reads data from |content| generator and stores it in cache."""
1257 raise NotImplementedError()
1258
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001259 def hardlink(self, digest, dest, file_mode):
1260 """Ensures file at |dest| has same content as cached |digest|.
1261
1262 If file_mode is provided, it is used to set the executable bit if
1263 applicable.
1264 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001265 raise NotImplementedError()
1266
1267
1268class MemoryCache(LocalCache):
1269 """LocalCache implementation that stores everything in memory."""
1270
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001271 def __init__(self, file_mode_mask=0500):
1272 """Args:
1273 file_mode_mask: bit mask to AND file mode with. Default value will make
1274 all mapped files to be read only.
1275 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001276 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001277 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001278 # Let's not assume dict is thread safe.
1279 self._lock = threading.Lock()
1280 self._contents = {}
1281
1282 def cached_set(self):
1283 with self._lock:
1284 return set(self._contents)
1285
1286 def touch(self, digest, size):
1287 with self._lock:
1288 return digest in self._contents
1289
1290 def evict(self, digest):
1291 with self._lock:
1292 self._contents.pop(digest, None)
1293
1294 def read(self, digest):
1295 with self._lock:
1296 return self._contents[digest]
1297
1298 def write(self, digest, content):
1299 # Assemble whole stream before taking the lock.
1300 data = ''.join(content)
1301 with self._lock:
1302 self._contents[digest] = data
1303
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001304 def hardlink(self, digest, dest, file_mode):
1305 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001306 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001307 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001308 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001309
1310
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001311class CachePolicies(object):
1312 def __init__(self, max_cache_size, min_free_space, max_items):
1313 """
1314 Arguments:
1315 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1316 cache is effectively a leak.
1317 - min_free_space: Trim if disk free space becomes lower than this value. If
1318 0, it unconditionally fill the disk.
1319 - max_items: Maximum number of items to keep in the cache. If 0, do not
1320 enforce a limit.
1321 """
1322 self.max_cache_size = max_cache_size
1323 self.min_free_space = min_free_space
1324 self.max_items = max_items
1325
1326
1327class DiskCache(LocalCache):
1328 """Stateful LRU cache in a flat hash table in a directory.
1329
1330 Saves its state as json file.
1331 """
1332 STATE_FILE = 'state.json'
1333
1334 def __init__(self, cache_dir, policies, hash_algo):
1335 """
1336 Arguments:
1337 cache_dir: directory where to place the cache.
1338 policies: cache retention policies.
1339 algo: hashing algorithm used.
1340 """
1341 super(DiskCache, self).__init__()
1342 self.cache_dir = cache_dir
1343 self.policies = policies
1344 self.hash_algo = hash_algo
1345 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1346
1347 # All protected methods (starting with '_') except _path should be called
1348 # with this lock locked.
1349 self._lock = threading_utils.LockWithAssert()
1350 self._lru = lru.LRUDict()
1351
1352 # Profiling values.
1353 self._added = []
1354 self._removed = []
1355 self._free_disk = 0
1356
1357 with tools.Profiler('Setup'):
1358 with self._lock:
1359 self._load()
1360
1361 def __enter__(self):
1362 return self
1363
1364 def __exit__(self, _exc_type, _exec_value, _traceback):
1365 with tools.Profiler('CleanupTrimming'):
1366 with self._lock:
1367 self._trim()
1368
1369 logging.info(
1370 '%5d (%8dkb) added',
1371 len(self._added), sum(self._added) / 1024)
1372 logging.info(
1373 '%5d (%8dkb) current',
1374 len(self._lru),
1375 sum(self._lru.itervalues()) / 1024)
1376 logging.info(
1377 '%5d (%8dkb) removed',
1378 len(self._removed), sum(self._removed) / 1024)
1379 logging.info(
1380 ' %8dkb free',
1381 self._free_disk / 1024)
1382 return False
1383
1384 def cached_set(self):
1385 with self._lock:
1386 return self._lru.keys_set()
1387
1388 def touch(self, digest, size):
1389 """Verifies an actual file is valid.
1390
1391 Note that is doesn't compute the hash so it could still be corrupted if the
1392 file size didn't change.
1393
1394 TODO(maruel): More stringent verification while keeping the check fast.
1395 """
1396 # Do the check outside the lock.
1397 if not is_valid_file(self._path(digest), size):
1398 return False
1399
1400 # Update it's LRU position.
1401 with self._lock:
1402 if digest not in self._lru:
1403 return False
1404 self._lru.touch(digest)
1405 return True
1406
1407 def evict(self, digest):
1408 with self._lock:
1409 self._lru.pop(digest)
1410 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1411
1412 def read(self, digest):
1413 with open(self._path(digest), 'rb') as f:
1414 return f.read()
1415
1416 def write(self, digest, content):
1417 path = self._path(digest)
1418 # A stale broken file may remain. It is possible for the file to have write
1419 # access bit removed which would cause the file_write() call to fail to open
1420 # in write mode. Take no chance here.
1421 file_path.try_remove(path)
1422 try:
1423 size = file_write(path, content)
1424 except:
1425 # There are two possible places were an exception can occur:
1426 # 1) Inside |content| generator in case of network or unzipping errors.
1427 # 2) Inside file_write itself in case of disk IO errors.
1428 # In any case delete an incomplete file and propagate the exception to
1429 # caller, it will be logged there.
1430 file_path.try_remove(path)
1431 raise
1432 # Make the file read-only in the cache. This has a few side-effects since
1433 # the file node is modified, so every directory entries to this file becomes
1434 # read-only. It's fine here because it is a new file.
1435 file_path.set_read_only(path, True)
1436 with self._lock:
1437 self._add(digest, size)
1438
1439 def hardlink(self, digest, dest, file_mode):
1440 """Hardlinks the file to |dest|.
1441
1442 Note that the file permission bits are on the file node, not the directory
1443 entry, so changing the access bit on any of the directory entries for the
1444 file node will affect them all.
1445 """
1446 path = self._path(digest)
1447 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1448 file_path.hardlink(path, dest)
1449 if file_mode is not None:
1450 # Ignores all other bits.
1451 os.chmod(dest, file_mode & 0500)
1452
1453 def _load(self):
1454 """Loads state of the cache from json file."""
1455 self._lock.assert_locked()
1456
1457 if not os.path.isdir(self.cache_dir):
1458 os.makedirs(self.cache_dir)
1459 else:
1460 # Make sure the cache is read-only.
1461 # TODO(maruel): Calculate the cost and optimize the performance
1462 # accordingly.
1463 file_path.make_tree_read_only(self.cache_dir)
1464
1465 # Load state of the cache.
1466 if os.path.isfile(self.state_file):
1467 try:
1468 self._lru = lru.LRUDict.load(self.state_file)
1469 except ValueError as err:
1470 logging.error('Failed to load cache state: %s' % (err,))
1471 # Don't want to keep broken state file.
1472 file_path.try_remove(self.state_file)
1473
1474 # Ensure that all files listed in the state still exist and add new ones.
1475 previous = self._lru.keys_set()
1476 unknown = []
1477 for filename in os.listdir(self.cache_dir):
1478 if filename == self.STATE_FILE:
1479 continue
1480 if filename in previous:
1481 previous.remove(filename)
1482 continue
1483 # An untracked file.
1484 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1485 logging.warning('Removing unknown file %s from cache', filename)
1486 file_path.try_remove(self._path(filename))
1487 continue
1488 # File that's not referenced in 'state.json'.
1489 # TODO(vadimsh): Verify its SHA1 matches file name.
1490 logging.warning('Adding unknown file %s to cache', filename)
1491 unknown.append(filename)
1492
1493 if unknown:
1494 # Add as oldest files. They will be deleted eventually if not accessed.
1495 self._add_oldest_list(unknown)
1496 logging.warning('Added back %d unknown files', len(unknown))
1497
1498 if previous:
1499 # Filter out entries that were not found.
1500 logging.warning('Removed %d lost files', len(previous))
1501 for filename in previous:
1502 self._lru.pop(filename)
1503 self._trim()
1504
1505 def _save(self):
1506 """Saves the LRU ordering."""
1507 self._lock.assert_locked()
1508 if sys.platform != 'win32':
1509 d = os.path.dirname(self.state_file)
1510 if os.path.isdir(d):
1511 # Necessary otherwise the file can't be created.
1512 file_path.set_read_only(d, False)
1513 if os.path.isfile(self.state_file):
1514 file_path.set_read_only(self.state_file, False)
1515 self._lru.save(self.state_file)
1516
1517 def _trim(self):
1518 """Trims anything we don't know, make sure enough free space exists."""
1519 self._lock.assert_locked()
1520
1521 # Ensure maximum cache size.
1522 if self.policies.max_cache_size:
1523 total_size = sum(self._lru.itervalues())
1524 while total_size > self.policies.max_cache_size:
1525 total_size -= self._remove_lru_file()
1526
1527 # Ensure maximum number of items in the cache.
1528 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1529 for _ in xrange(len(self._lru) - self.policies.max_items):
1530 self._remove_lru_file()
1531
1532 # Ensure enough free space.
1533 self._free_disk = file_path.get_free_space(self.cache_dir)
1534 trimmed_due_to_space = False
1535 while (
1536 self.policies.min_free_space and
1537 self._lru and
1538 self._free_disk < self.policies.min_free_space):
1539 trimmed_due_to_space = True
1540 self._remove_lru_file()
1541 self._free_disk = file_path.get_free_space(self.cache_dir)
1542 if trimmed_due_to_space:
1543 total_usage = sum(self._lru.itervalues())
1544 usage_percent = 0.
1545 if total_usage:
1546 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1547 logging.warning(
1548 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1549 'cache (%.1f%% of its maximum capacity)',
1550 self._free_disk / 1024.,
1551 total_usage / 1024.,
1552 usage_percent)
1553 self._save()
1554
1555 def _path(self, digest):
1556 """Returns the path to one item."""
1557 return os.path.join(self.cache_dir, digest)
1558
1559 def _remove_lru_file(self):
1560 """Removes the last recently used file and returns its size."""
1561 self._lock.assert_locked()
1562 digest, size = self._lru.pop_oldest()
1563 self._delete_file(digest, size)
1564 return size
1565
1566 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1567 """Adds an item into LRU cache marking it as a newest one."""
1568 self._lock.assert_locked()
1569 if size == UNKNOWN_FILE_SIZE:
1570 size = os.stat(self._path(digest)).st_size
1571 self._added.append(size)
1572 self._lru.add(digest, size)
1573
1574 def _add_oldest_list(self, digests):
1575 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1576 self._lock.assert_locked()
1577 pairs = []
1578 for digest in digests:
1579 size = os.stat(self._path(digest)).st_size
1580 self._added.append(size)
1581 pairs.append((digest, size))
1582 self._lru.batch_insert_oldest(pairs)
1583
1584 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1585 """Deletes cache file from the file system."""
1586 self._lock.assert_locked()
1587 try:
1588 if size == UNKNOWN_FILE_SIZE:
1589 size = os.stat(self._path(digest)).st_size
1590 file_path.try_remove(self._path(digest))
1591 self._removed.append(size)
1592 except OSError as e:
1593 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1594
1595
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001596class IsolatedBundle(object):
1597 """Fetched and parsed .isolated file with all dependencies."""
1598
Vadim Shtayura3148e072014-09-02 18:51:52 -07001599 def __init__(self):
1600 self.command = []
1601 self.files = {}
1602 self.read_only = None
1603 self.relative_cwd = None
1604 # The main .isolated file, a IsolatedFile instance.
1605 self.root = None
1606
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001607 def fetch(self, fetch_queue, root_isolated_hash, algo):
1608 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001609
1610 It enables support for "included" .isolated files. They are processed in
1611 strict order but fetched asynchronously from the cache. This is important so
1612 that a file in an included .isolated file that is overridden by an embedding
1613 .isolated file is not fetched needlessly. The includes are fetched in one
1614 pass and the files are fetched as soon as all the ones on the left-side
1615 of the tree were fetched.
1616
1617 The prioritization is very important here for nested .isolated files.
1618 'includes' have the highest priority and the algorithm is optimized for both
1619 deep and wide trees. A deep one is a long link of .isolated files referenced
1620 one at a time by one item in 'includes'. A wide one has a large number of
1621 'includes' in a single .isolated file. 'left' is defined as an included
1622 .isolated file earlier in the 'includes' list. So the order of the elements
1623 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001624
1625 As a side effect this method starts asynchronous fetch of all data files
1626 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1627 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001628 """
1629 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1630
1631 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1632 pending = {}
1633 # Set of hashes of already retrieved items to refuse recursive includes.
1634 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001635 # Set of IsolatedFile's whose data files have already being fetched.
1636 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001637
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001638 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001639 h = isolated_file.obj_hash
1640 if h in seen:
1641 raise isolated_format.IsolatedError(
1642 'IsolatedFile %s is retrieved recursively' % h)
1643 assert h not in pending
1644 seen.add(h)
1645 pending[h] = isolated_file
1646 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1647
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001648 # Start fetching root *.isolated file (single file, not the whole bundle).
1649 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001650
1651 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001652 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001653 item_hash = fetch_queue.wait(pending)
1654 item = pending.pop(item_hash)
1655 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001656
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001657 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001658 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001659 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001660
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001661 # Always fetch *.isolated files in traversal order, waiting if necessary
1662 # until next to-be-processed node loads. "Waiting" is done by yielding
1663 # back to the outer loop, that waits until some *.isolated is loaded.
1664 for node in isolated_format.walk_includes(self.root):
1665 if node not in processed:
1666 # Not visited, and not yet loaded -> wait for it to load.
1667 if not node.is_loaded:
1668 break
1669 # Not visited and loaded -> process it and continue the traversal.
1670 self._start_fetching_files(node, fetch_queue)
1671 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001672
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001673 # All *.isolated files should be processed by now and only them.
1674 all_isolateds = set(isolated_format.walk_includes(self.root))
1675 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001676
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001677 # Extract 'command' and other bundle properties.
1678 for node in isolated_format.walk_includes(self.root):
1679 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001680 self.relative_cwd = self.relative_cwd or ''
1681
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001682 def _start_fetching_files(self, isolated, fetch_queue):
1683 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001684
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001685 Modifies self.files.
1686 """
1687 logging.debug('fetch_files(%s)', isolated.obj_hash)
1688 for filepath, properties in isolated.data.get('files', {}).iteritems():
1689 # Root isolated has priority on the files being mapped. In particular,
1690 # overridden files must not be fetched.
1691 if filepath not in self.files:
1692 self.files[filepath] = properties
1693 if 'h' in properties:
1694 # Preemptively request files.
1695 logging.debug('fetching %s', filepath)
1696 fetch_queue.add(
1697 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1698
1699 def _update_self(self, node):
1700 """Extracts bundle global parameters from loaded *.isolated file.
1701
1702 Will be called with each loaded *.isolated file in order of traversal of
1703 isolated include graph (see isolated_format.walk_includes).
1704 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001705 # Grabs properties.
1706 if not self.command and node.data.get('command'):
1707 # Ensure paths are correctly separated on windows.
1708 self.command = node.data['command']
1709 if self.command:
1710 self.command[0] = self.command[0].replace('/', os.path.sep)
1711 self.command = tools.fix_python_path(self.command)
1712 if self.read_only is None and node.data.get('read_only') is not None:
1713 self.read_only = node.data['read_only']
1714 if (self.relative_cwd is None and
1715 node.data.get('relative_cwd') is not None):
1716 self.relative_cwd = node.data['relative_cwd']
1717
1718
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001719def get_storage_api(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001720 """Returns an object that implements low-level StorageApi interface.
1721
1722 It is used by Storage to work with single isolate |namespace|. It should
1723 rarely be used directly by clients, see 'get_storage' for
1724 a better alternative.
1725
1726 Arguments:
1727 file_or_url: a file path to use file system based storage, or URL of isolate
1728 service to use shared cloud based storage.
1729 namespace: isolate namespace to operate in, also defines hashing and
1730 compression scheme used, i.e. namespace names that end with '-gzip'
1731 store compressed data.
1732
1733 Returns:
1734 Instance of StorageApi subclass.
1735 """
Marc-Antoine Ruel37989932013-11-19 16:28:08 -05001736 if file_path.is_url(file_or_url):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001737 return IsolateServer(file_or_url, namespace)
1738 else:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001739 return FileSystem(file_or_url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001740
1741
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001742def get_storage(file_or_url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001743 """Returns Storage class that can upload and download from |namespace|.
1744
1745 Arguments:
1746 file_or_url: a file path to use file system based storage, or URL of isolate
1747 service to use shared cloud based storage.
1748 namespace: isolate namespace to operate in, also defines hashing and
1749 compression scheme used, i.e. namespace names that end with '-gzip'
1750 store compressed data.
1751
1752 Returns:
1753 Instance of Storage.
1754 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001755 return Storage(get_storage_api(file_or_url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001756
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001757
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001758def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001759 """Uploads the given tree to the given url.
1760
1761 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001762 base_url: The url of the isolate server to upload to.
1763 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001764 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001765 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001766 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001767 # Filter out symlinks, since they are not represented by items on isolate
1768 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001769 items = []
1770 seen = set()
1771 skipped = 0
1772 for filepath, metadata in infiles:
1773 if 'l' not in metadata and filepath not in seen:
1774 seen.add(filepath)
1775 item = FileItem(
1776 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001777 digest=metadata['h'],
1778 size=metadata['s'],
1779 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001780 items.append(item)
1781 else:
1782 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001783
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001784 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001785 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001786 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001787
1788
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001789def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001790 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001791
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001792 Arguments:
1793 isolated_hash: hash of the root *.isolated file.
1794 storage: Storage class that communicates with isolate storage.
1795 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001796 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001797 require_command: Ensure *.isolated specifies a command to run.
1798
1799 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001800 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001801 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001802 logging.debug(
1803 'fetch_isolated(%s, %s, %s, %s, %s)',
1804 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001805 # Hash algorithm to use, defined by namespace |storage| is using.
1806 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001807 with cache:
1808 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001809 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001810
1811 with tools.Profiler('GetIsolateds'):
1812 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001813 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001814 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1815 try:
1816 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1817 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001818 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001819 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1820 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001821
1822 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001823 bundle.fetch(fetch_queue, isolated_hash, algo)
1824 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001825 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1826 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001827 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001828
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001829 with tools.Profiler('GetRest'):
1830 # Create file system hierarchy.
1831 if not os.path.isdir(outdir):
1832 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001833 create_directories(outdir, bundle.files)
1834 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001835
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001836 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001837 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001838 if not os.path.isdir(cwd):
1839 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001840
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001841 # Multimap: digest -> list of pairs (path, props).
1842 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001843 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001844 if 'h' in props:
1845 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001846
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001847 # Now block on the remaining files to be downloaded and mapped.
1848 logging.info('Retrieving remaining files (%d of them)...',
1849 fetch_queue.pending_count)
1850 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001851 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001852 while remaining:
1853 detector.ping()
1854
1855 # Wait for any item to finish fetching to cache.
1856 digest = fetch_queue.wait(remaining)
1857
1858 # Link corresponding files to a fetched item in cache.
1859 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001860 cache.hardlink(
1861 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001862
1863 # Report progress.
1864 duration = time.time() - last_update
1865 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1866 msg = '%d files remaining...' % len(remaining)
1867 print msg
1868 logging.info(msg)
1869 last_update = time.time()
1870
1871 # Cache could evict some items we just tried to fetch, it's a fatal error.
1872 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001873 raise isolated_format.MappingError(
1874 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001875 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001876
1877
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001878def directory_to_metadata(root, algo, blacklist):
1879 """Returns the FileItem list and .isolated metadata for a directory."""
1880 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001881 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001882 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001883 metadata = {
1884 relpath: isolated_format.file_to_metadata(
1885 os.path.join(root, relpath), {}, False, algo)
1886 for relpath in paths
1887 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001888 for v in metadata.itervalues():
1889 v.pop('t')
1890 items = [
1891 FileItem(
1892 path=os.path.join(root, relpath),
1893 digest=meta['h'],
1894 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001895 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001896 for relpath, meta in metadata.iteritems() if 'h' in meta
1897 ]
1898 return items, metadata
1899
1900
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001901def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001902 """Stores every entries and returns the relevant data.
1903
1904 Arguments:
1905 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001906 files: list of file paths to upload. If a directory is specified, a
1907 .isolated file is created and its hash is returned.
1908 blacklist: function that returns True if a file should be omitted.
1909 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001910 assert all(isinstance(i, unicode) for i in files), files
1911 if len(files) != len(set(map(os.path.abspath, files))):
1912 raise Error('Duplicate entries found.')
1913
1914 results = []
1915 # The temporary directory is only created as needed.
1916 tempdir = None
1917 try:
1918 # TODO(maruel): Yield the files to a worker thread.
1919 items_to_upload = []
1920 for f in files:
1921 try:
1922 filepath = os.path.abspath(f)
1923 if os.path.isdir(filepath):
1924 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001925 items, metadata = directory_to_metadata(
1926 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001927
1928 # Create the .isolated file.
1929 if not tempdir:
1930 tempdir = tempfile.mkdtemp(prefix='isolateserver')
1931 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix='.isolated')
1932 os.close(handle)
1933 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001934 'algo':
1935 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001936 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001937 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001938 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001939 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001940 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001941 items_to_upload.extend(items)
1942 items_to_upload.append(
1943 FileItem(
1944 path=isolated,
1945 digest=h,
1946 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001947 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001948 results.append((h, f))
1949
1950 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001951 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001952 items_to_upload.append(
1953 FileItem(
1954 path=filepath,
1955 digest=h,
1956 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001957 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001958 results.append((h, f))
1959 else:
1960 raise Error('%s is neither a file or directory.' % f)
1961 except OSError:
1962 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001963 # Technically we would care about which files were uploaded but we don't
1964 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001965 _uploaded_files = storage.upload_items(items_to_upload)
1966 return results
1967 finally:
1968 if tempdir:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001969 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001970
1971
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001972def archive(out, namespace, files, blacklist):
1973 if files == ['-']:
1974 files = sys.stdin.readlines()
1975
1976 if not files:
1977 raise Error('Nothing to upload')
1978
1979 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001980 blacklist = tools.gen_blacklist(blacklist)
1981 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001982 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001983 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1984
1985
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001986@subcommand.usage('<file1..fileN> or - to read from stdin')
1987def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001988 """Archives data to the server.
1989
1990 If a directory is specified, a .isolated file is created the whole directory
1991 is uploaded. Then this .isolated file can be included in another one to run
1992 commands.
1993
1994 The commands output each file that was processed with its content hash. For
1995 directories, the .isolated generated for the directory is listed as the
1996 directory entry itself.
1997 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001998 add_isolate_server_options(parser, False)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001999 parser.add_option(
2000 '--blacklist',
2001 action='append', default=list(DEFAULT_BLACKLIST),
2002 help='List of regexp to use as blacklist filter when uploading '
2003 'directories')
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002004 options, files = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002005 process_isolate_server_options(parser, options)
Vadim Shtayura6b555c12014-07-23 16:22:18 -07002006 if file_path.is_url(options.isolate_server):
2007 auth.ensure_logged_in(options.isolate_server)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002008 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002009 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002010 except Error as e:
2011 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002012 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002013
2014
2015def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002016 """Download data from the server.
2017
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002018 It can either download individual files or a complete tree from a .isolated
2019 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002020 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002021 add_isolate_server_options(parser, True)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002022 parser.add_option(
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002023 '-i', '--isolated', metavar='HASH',
2024 help='hash of an isolated file, .isolated file content is discarded, use '
2025 '--file if you need it')
2026 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002027 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2028 help='hash and destination of a file, can be used multiple times')
2029 parser.add_option(
2030 '-t', '--target', metavar='DIR', default=os.getcwd(),
2031 help='destination directory')
2032 options, args = parser.parse_args(args)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002033 process_isolate_server_options(parser, options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002034 if args:
2035 parser.error('Unsupported arguments: %s' % args)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002036 if bool(options.isolated) == bool(options.file):
2037 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002038
2039 options.target = os.path.abspath(options.target)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002040
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002041 remote = options.isolate_server or options.indir
Vadim Shtayura6b555c12014-07-23 16:22:18 -07002042 if file_path.is_url(remote):
2043 auth.ensure_logged_in(remote)
2044
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002045 with get_storage(remote, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002046 # Fetching individual files.
2047 if options.file:
2048 channel = threading_utils.TaskChannel()
2049 pending = {}
2050 for digest, dest in options.file:
2051 pending[digest] = dest
2052 storage.async_fetch(
2053 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002054 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002055 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002056 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002057 functools.partial(file_write, os.path.join(options.target, dest)))
2058 while pending:
2059 fetched = channel.pull()
2060 dest = pending.pop(fetched)
2061 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002062
Vadim Shtayura3172be52013-12-03 12:49:05 -08002063 # Fetching whole isolated tree.
2064 if options.isolated:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002065 bundle = fetch_isolated(
Vadim Shtayura3172be52013-12-03 12:49:05 -08002066 isolated_hash=options.isolated,
2067 storage=storage,
2068 cache=MemoryCache(),
Vadim Shtayura3172be52013-12-03 12:49:05 -08002069 outdir=options.target,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002070 require_command=False)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002071 rel = os.path.join(options.target, bundle.relative_cwd)
Vadim Shtayura3172be52013-12-03 12:49:05 -08002072 print('To run this test please run from the directory %s:' %
2073 os.path.join(options.target, rel))
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002074 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002075
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002076 return 0
2077
2078
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002079def add_isolate_server_options(parser, add_indir):
2080 """Adds --isolate-server and --namespace options to parser.
2081
2082 Includes --indir if desired.
2083 """
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002084 parser.add_option(
2085 '-I', '--isolate-server',
2086 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002087 help='URL of the Isolate Server to use. Defaults to the environment '
2088 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2089 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002090 parser.add_option(
2091 '--namespace', default='default-gzip',
2092 help='The namespace to use on the Isolate Server, default: %default')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002093 if add_indir:
2094 parser.add_option(
2095 '--indir', metavar='DIR',
2096 help='Directory used to store the hashtable instead of using an '
2097 'isolate server.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002098
2099
2100def process_isolate_server_options(parser, options):
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002101 """Processes the --isolate-server and --indir options and aborts if neither is
2102 specified.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002103 """
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002104 has_indir = hasattr(options, 'indir')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002105 if not options.isolate_server:
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002106 if not has_indir:
2107 parser.error('--isolate-server is required.')
2108 elif not options.indir:
2109 parser.error('Use one of --indir or --isolate-server.')
2110 else:
2111 if has_indir and options.indir:
2112 parser.error('Use only one of --indir or --isolate-server.')
2113
2114 if options.isolate_server:
2115 parts = urlparse.urlparse(options.isolate_server, 'https')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002116 if parts.query:
2117 parser.error('--isolate-server doesn\'t support query parameter.')
2118 if parts.fragment:
2119 parser.error('--isolate-server doesn\'t support fragment in the url.')
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002120 # urlparse('foo.com') will result in netloc='', path='foo.com', which is not
2121 # what is desired here.
2122 new = list(parts)
2123 if not new[1] and new[2]:
2124 new[1] = new[2].rstrip('/')
2125 new[2] = ''
2126 new[2] = new[2].rstrip('/')
2127 options.isolate_server = urlparse.urlunparse(new)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002128 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002129 return
2130
2131 if file_path.is_url(options.indir):
2132 parser.error('Can\'t use an URL for --indir.')
2133 options.indir = unicode(options.indir).replace('/', os.path.sep)
2134 options.indir = os.path.abspath(
2135 os.path.normpath(os.path.join(os.getcwd(), options.indir)))
2136 if not os.path.isdir(options.indir):
2137 parser.error('Path given to --indir must exist.')
2138
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002139
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002140class OptionParserIsolateServer(tools.OptionParserWithLogging):
2141 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002142 tools.OptionParserWithLogging.__init__(
2143 self,
2144 version=__version__,
2145 prog=os.path.basename(sys.modules[__name__].__file__),
2146 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002147 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002148
2149 def parse_args(self, *args, **kwargs):
2150 options, args = tools.OptionParserWithLogging.parse_args(
2151 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002152 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002153 return options, args
2154
2155
2156def main(args):
2157 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002158 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002159
2160
2161if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002162 fix_encoding.fix_encoding()
2163 tools.disable_buffering()
2164 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002165 sys.exit(main(sys.argv[1:]))