blob: a0c52734415ffecb37c697afc09f87148cb2c810 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Cory Massarocc19c8c2015-03-10 13:35:11 -07008__version__ = '0.4.3'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040031from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000032from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040033from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000034from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000035from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040038import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000040
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000041# Version of isolate protocol passed to the server in /handshake request.
42ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000043
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000044
Vadim Shtayura3148e072014-09-02 18:51:52 -070045# The file size to be used when we don't know the correct file size,
46# generally used for .isolated files.
47UNKNOWN_FILE_SIZE = None
48
49
50# Maximum expected delay (in seconds) between successive file fetches or uploads
51# in Storage. If it takes longer than that, a deadlock might be happening
52# and all stack frames for all threads are dumped to log.
53DEADLOCK_TIMEOUT = 5 * 60
54
55
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000056# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000057# All files are sorted by likelihood of a change in the file content
58# (currently file size is used to estimate this: larger the file -> larger the
59# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# and so on. Numbers here is a trade-off; the more per request, the lower the
62# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
63# larger values cause longer lookups, increasing the initial latency to start
64# uploading, which is especially an issue for large files. This value is
65# optimized for the "few thousands files to look up with minimal number of large
66# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040067ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000068
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000069
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000070# A list of already compressed extension types that should not receive any
71# compression before being uploaded.
72ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040073 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
74 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075]
76
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000077
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000078# Chunk size to use when reading from network stream.
79NET_IO_FILE_CHUNK = 16 * 1024
80
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000081
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000082# Read timeout in seconds for downloads from isolate storage. If there's no
83# response from the server within this timeout whole download will be aborted.
84DOWNLOAD_READ_TIMEOUT = 60
85
86
maruel@chromium.org41601642013-09-18 19:40:46 +000087# The delay (in seconds) to wait between logging statements when retrieving
88# the required files. This is intended to let the user (or buildbot) know that
89# the program is still running.
90DELAY_BETWEEN_UPDATES_IN_SECS = 30
91
92
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050093DEFAULT_BLACKLIST = (
94 # Temporary vim or python files.
95 r'^.+\.(?:pyc|swp)$',
96 # .git or .svn directory.
97 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
98)
99
100
Vadim Shtayura8623c272014-12-01 11:45:27 -0800101# A class to use to communicate with the server by default. Can be changed by
102# 'set_storage_api_class'. Default is IsolateServer.
103_storage_api_cls = None
104
105
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500106class Error(Exception):
107 """Generic runtime error."""
108 pass
109
110
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400111class Aborted(Error):
112 """Operation aborted."""
113 pass
114
115
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000116def stream_read(stream, chunk_size):
117 """Reads chunks from |stream| and yields them."""
118 while True:
119 data = stream.read(chunk_size)
120 if not data:
121 break
122 yield data
123
124
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400125def file_read(filepath, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800126 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000127 with open(filepath, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800128 if offset:
129 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000130 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000131 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 if not data:
133 break
134 yield data
135
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137def file_write(filepath, content_generator):
138 """Writes file content as generated by content_generator.
139
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 Creates the intermediary directory as needed.
141
142 Returns the number of bytes written.
143
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 Meant to be mocked out in unit tests.
145 """
146 filedir = os.path.dirname(filepath)
147 if not os.path.isdir(filedir):
148 os.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000149 total = 0
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000150 with open(filepath, 'wb') as f:
151 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000152 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000153 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000154 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000155
156
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000157def zip_compress(content_generator, level=7):
158 """Reads chunks from |content_generator| and yields zip compressed chunks."""
159 compressor = zlib.compressobj(level)
160 for chunk in content_generator:
161 compressed = compressor.compress(chunk)
162 if compressed:
163 yield compressed
164 tail = compressor.flush(zlib.Z_FINISH)
165 if tail:
166 yield tail
167
168
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400169def zip_decompress(
170 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000171 """Reads zipped data from |content_generator| and yields decompressed data.
172
173 Decompresses data in small chunks (no larger than |chunk_size|) so that
174 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
175
176 Raises IOError if data is corrupted or incomplete.
177 """
178 decompressor = zlib.decompressobj()
179 compressed_size = 0
180 try:
181 for chunk in content_generator:
182 compressed_size += len(chunk)
183 data = decompressor.decompress(chunk, chunk_size)
184 if data:
185 yield data
186 while decompressor.unconsumed_tail:
187 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
188 if data:
189 yield data
190 tail = decompressor.flush()
191 if tail:
192 yield tail
193 except zlib.error as e:
194 raise IOError(
195 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
196 # Ensure all data was read and decompressed.
197 if decompressor.unused_data or decompressor.unconsumed_tail:
198 raise IOError('Not all data was decompressed')
199
200
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000201def get_zip_compression_level(filename):
202 """Given a filename calculates the ideal zip compression level to use."""
203 file_ext = os.path.splitext(filename)[1].lower()
204 # TODO(csharp): Profile to find what compression level works best.
205 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
206
207
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000208def create_directories(base_directory, files):
209 """Creates the directory structure needed by the given list of files."""
210 logging.debug('create_directories(%s, %d)', base_directory, len(files))
211 # Creates the tree of directories to create.
212 directories = set(os.path.dirname(f) for f in files)
213 for item in list(directories):
214 while item:
215 directories.add(item)
216 item = os.path.dirname(item)
217 for d in sorted(directories):
218 if d:
219 os.mkdir(os.path.join(base_directory, d))
220
221
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500222def create_symlinks(base_directory, files):
223 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000224 for filepath, properties in files:
225 if 'l' not in properties:
226 continue
227 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500228 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000229 logging.warning('Ignoring symlink %s', filepath)
230 continue
231 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500232 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000234
235
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000236def is_valid_file(filepath, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000237 """Determines if the given files appears valid.
238
239 Currently it just checks the file's size.
240 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700241 if size == UNKNOWN_FILE_SIZE:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000242 return os.path.isfile(filepath)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000243 actual_size = os.stat(filepath).st_size
244 if size != actual_size:
245 logging.warning(
246 'Found invalid item %s; %d != %d',
247 os.path.basename(filepath), actual_size, size)
248 return False
249 return True
250
251
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000252class Item(object):
253 """An item to push to Storage.
254
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800255 Its digest and size may be provided in advance, if known. Otherwise they will
256 be derived from content(). If digest is provided, it MUST correspond to
257 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000258
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800259 When used with Storage, Item starts its life in a main thread, travels
260 to 'contains' thread, then to 'push' thread and then finally back to
261 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000262 """
263
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800264 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000265 self.digest = digest
266 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800267 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000269
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800270 def content(self):
271 """Iterable with content of this item as byte string (str) chunks."""
272 raise NotImplementedError()
273
274 def prepare(self, hash_algo):
275 """Ensures self.digest and self.size are set.
276
277 Uses content() as a source of data to calculate them. Does nothing if digest
278 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000279
280 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800281 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000282 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800283 if self.digest is None or self.size is None:
284 digest = hash_algo()
285 total = 0
286 for chunk in self.content():
287 digest.update(chunk)
288 total += len(chunk)
289 self.digest = digest.hexdigest()
290 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000291
292
293class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800294 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000295
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800296 Its digest and size may be provided in advance, if known. Otherwise they will
297 be derived from the file content.
298 """
299
300 def __init__(self, path, digest=None, size=None, high_priority=False):
301 super(FileItem, self).__init__(
302 digest,
303 size if size is not None else os.stat(path).st_size,
304 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000305 self.path = path
306 self.compression_level = get_zip_compression_level(path)
307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 def content(self):
309 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000310
311
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000312class BufferItem(Item):
313 """A byte buffer to push to Storage."""
314
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800315 def __init__(self, buf, high_priority=False):
316 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000317 self.buffer = buf
318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000320 return [self.buffer]
321
322
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000323class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000325
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 Implements compression support, parallel 'contains' checks, parallel uploads
327 and more.
328
329 Works only within single namespace (and thus hashing algorithm and compression
330 scheme are fixed).
331
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400332 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
333 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800334 """
335
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700336 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000337 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400338 self._use_zip = isolated_format.is_namespace_with_compression(
339 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400340 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000341 self._cpu_thread_pool = None
342 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400343 self._aborted = False
344 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000345
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000346 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700347 def hash_algo(self):
348 """Hashing algorithm used to name files in storage based on their content.
349
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400350 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700351 """
352 return self._hash_algo
353
354 @property
355 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500356 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700357 return self._storage_api.location
358
359 @property
360 def namespace(self):
361 """Isolate namespace used by this storage.
362
363 Indirectly defines hashing scheme and compression method used.
364 """
365 return self._storage_api.namespace
366
367 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000368 def cpu_thread_pool(self):
369 """ThreadPool for CPU-bound tasks like zipping."""
370 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500371 threads = max(threading_utils.num_processors(), 2)
372 if sys.maxsize <= 2L**32:
373 # On 32 bits userland, do not try to use more than 16 threads.
374 threads = min(threads, 16)
375 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000376 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000377
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 @property
379 def net_thread_pool(self):
380 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
381 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700382 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000384
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 def close(self):
386 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400387 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 if self._cpu_thread_pool:
389 self._cpu_thread_pool.join()
390 self._cpu_thread_pool.close()
391 self._cpu_thread_pool = None
392 if self._net_thread_pool:
393 self._net_thread_pool.join()
394 self._net_thread_pool.close()
395 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400396 logging.info('Done.')
397
398 def abort(self):
399 """Cancels any pending or future operations."""
400 # This is not strictly theadsafe, but in the worst case the logging message
401 # will be printed twice. Not a big deal. In other places it is assumed that
402 # unprotected reads and writes to _aborted are serializable (it is true
403 # for python) and thus no locking is used.
404 if not self._aborted:
405 logging.warning('Aborting... It can take a while.')
406 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000407
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000408 def __enter__(self):
409 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400410 assert not self._prev_sig_handlers, self._prev_sig_handlers
411 for s in (signal.SIGINT, signal.SIGTERM):
412 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000413 return self
414
415 def __exit__(self, _exc_type, _exc_value, _traceback):
416 """Context manager interface."""
417 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400418 while self._prev_sig_handlers:
419 s, h = self._prev_sig_handlers.popitem()
420 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000421 return False
422
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000423 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800424 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000425
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800426 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000427
428 Arguments:
429 items: list of Item instances that represents data to upload.
430
431 Returns:
432 List of items that were uploaded. All other items are already there.
433 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700434 logging.info('upload_items(items=%d)', len(items))
435
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800436 # Ensure all digests are calculated.
437 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700438 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800439
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000440 # For each digest keep only first Item that matches it. All other items
441 # are just indistinguishable copies from the point of view of isolate
442 # server (it doesn't care about paths at all, only content and digests).
443 seen = {}
444 duplicates = 0
445 for item in items:
446 if seen.setdefault(item.digest, item) is not item:
447 duplicates += 1
448 items = seen.values()
449 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700450 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000453 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000454 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800455 channel = threading_utils.TaskChannel()
456 for missing_item, push_state in self.get_missing_items(items):
457 missing.add(missing_item)
458 self.async_push(channel, missing_item, push_state)
459
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000460 # No need to spawn deadlock detector thread if there's nothing to upload.
461 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700462 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000464 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 detector.ping()
466 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000467 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000468 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000469 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000470 logging.info('All files are uploaded')
471
472 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000473 total = len(items)
474 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000475 logging.info(
476 'Total: %6d, %9.1fkb',
477 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000478 total_size / 1024.)
479 cache_hit = set(items) - missing
480 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 logging.info(
482 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
483 len(cache_hit),
484 cache_hit_size / 1024.,
485 len(cache_hit) * 100. / total,
486 cache_hit_size * 100. / total_size if total_size else 0)
487 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000488 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 logging.info(
490 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
491 len(cache_miss),
492 cache_miss_size / 1024.,
493 len(cache_miss) * 100. / total,
494 cache_miss_size * 100. / total_size if total_size else 0)
495
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496 return uploaded
497
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 def get_fetch_url(self, item):
499 """Returns an URL that can be used to fetch given item once it's uploaded.
500
501 Note that if namespace uses compression, data at given URL is compressed.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000502
503 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 item: Item to get fetch URL for.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000505
506 Returns:
507 An URL or None if underlying protocol doesn't support this.
508 """
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700509 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 return self._storage_api.get_fetch_url(item.digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000511
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800512 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000513 """Starts asynchronous push to the server in a parallel thread.
514
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 Can be used only after |item| was checked for presence on a server with
516 'get_missing_items' call. 'get_missing_items' returns |push_state| object
517 that contains storage specific information describing how to upload
518 the item (for example in case of cloud storage, it is signed upload URLs).
519
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000520 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000521 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000522 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800523 push_state: push state returned by 'get_missing_items' call for |item|.
524
525 Returns:
526 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400529 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700530 threading_utils.PRIORITY_HIGH if item.high_priority
531 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800532
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000533 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400534 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400535 if self._aborted:
536 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800538 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 return item
540
541 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700542 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800543 self.net_thread_pool.add_task_with_channel(
544 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000545 return
546
547 # If zipping is enabled, zip in a separate thread.
548 def zip_and_push():
549 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
550 # content right here. It will block until all file is zipped.
551 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400552 if self._aborted:
553 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800554 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 data = ''.join(stream)
556 except Exception as exc:
557 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800558 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000560 self.net_thread_pool.add_task_with_channel(
561 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000563
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800564 def push(self, item, push_state):
565 """Synchronously pushes a single item to the server.
566
567 If you need to push many items at once, consider using 'upload_items' or
568 'async_push' with instance of TaskChannel.
569
570 Arguments:
571 item: item to upload as instance of Item class.
572 push_state: push state returned by 'get_missing_items' call for |item|.
573
574 Returns:
575 Pushed item (same object as |item|).
576 """
577 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700578 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800579 self.async_push(channel, item, push_state)
580 pushed = channel.pull()
581 assert pushed is item
582 return item
583
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000584 def async_fetch(self, channel, priority, digest, size, sink):
585 """Starts asynchronous fetch from the server in a parallel thread.
586
587 Arguments:
588 channel: TaskChannel that receives back |digest| when download ends.
589 priority: thread pool task priority for the fetch.
590 digest: hex digest of an item to download.
591 size: expected size of the item (after decompression).
592 sink: function that will be called as sink(generator).
593 """
594 def fetch():
595 try:
596 # Prepare reading pipeline.
597 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700598 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400599 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000600 # Run |stream| through verifier that will assert its size.
601 verifier = FetchStreamVerifier(stream, size)
602 # Verified stream goes to |sink|.
603 sink(verifier.run())
604 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800605 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000606 raise
607 return digest
608
609 # Don't bother with zip_thread_pool for decompression. Decompression is
610 # really fast and most probably IO bound anyway.
611 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
612
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000613 def get_missing_items(self, items):
614 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000615
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000616 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000617
618 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000619 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
621 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800622 For each missing item it yields a pair (item, push_state), where:
623 * item - Item object that is missing (one of |items|).
624 * push_state - opaque object that contains storage specific information
625 describing how to upload the item (for example in case of cloud
626 storage, it is signed upload URLs). It can later be passed to
627 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000629 channel = threading_utils.TaskChannel()
630 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
632 # Ensure all digests are calculated.
633 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700634 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400636 def contains(batch):
637 if self._aborted:
638 raise Aborted()
639 return self._storage_api.contains(batch)
640
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000641 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400643 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400644 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000645 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 # Yield results as they come in.
648 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649 for missing_item, push_state in channel.pull().iteritems():
650 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000652
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653def batch_items_for_check(items):
654 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000655
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800656 Each batch corresponds to a single 'exists?' query to the server via a call
657 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000658
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800659 Arguments:
660 items: a list of Item objects.
661
662 Yields:
663 Batches of items to query for existence in a single operation,
664 each batch is a list of Item objects.
665 """
666 batch_count = 0
667 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
668 next_queries = []
669 for item in sorted(items, key=lambda x: x.size, reverse=True):
670 next_queries.append(item)
671 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000672 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800673 next_queries = []
674 batch_count += 1
675 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
676 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
677 if next_queries:
678 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000679
680
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000681class FetchQueue(object):
682 """Fetches items from Storage and places them into LocalCache.
683
684 It manages multiple concurrent fetch operations. Acts as a bridge between
685 Storage and LocalCache so that Storage and LocalCache don't depend on each
686 other at all.
687 """
688
689 def __init__(self, storage, cache):
690 self.storage = storage
691 self.cache = cache
692 self._channel = threading_utils.TaskChannel()
693 self._pending = set()
694 self._accessed = set()
695 self._fetched = cache.cached_set()
696
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400697 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700698 self,
699 digest,
700 size=UNKNOWN_FILE_SIZE,
701 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000702 """Starts asynchronous fetch of item |digest|."""
703 # Fetching it now?
704 if digest in self._pending:
705 return
706
707 # Mark this file as in use, verify_all_cached will later ensure it is still
708 # in cache.
709 self._accessed.add(digest)
710
711 # Already fetched? Notify cache to update item's LRU position.
712 if digest in self._fetched:
713 # 'touch' returns True if item is in cache and not corrupted.
714 if self.cache.touch(digest, size):
715 return
716 # Item is corrupted, remove it from cache and fetch it again.
717 self._fetched.remove(digest)
718 self.cache.evict(digest)
719
720 # TODO(maruel): It should look at the free disk space, the current cache
721 # size and the size of the new item on every new item:
722 # - Trim the cache as more entries are listed when free disk space is low,
723 # otherwise if the amount of data downloaded during the run > free disk
724 # space, it'll crash.
725 # - Make sure there's enough free disk space to fit all dependencies of
726 # this run! If not, abort early.
727
728 # Start fetching.
729 self._pending.add(digest)
730 self.storage.async_fetch(
731 self._channel, priority, digest, size,
732 functools.partial(self.cache.write, digest))
733
734 def wait(self, digests):
735 """Starts a loop that waits for at least one of |digests| to be retrieved.
736
737 Returns the first digest retrieved.
738 """
739 # Flush any already fetched items.
740 for digest in digests:
741 if digest in self._fetched:
742 return digest
743
744 # Ensure all requested items are being fetched now.
745 assert all(digest in self._pending for digest in digests), (
746 digests, self._pending)
747
748 # Wait for some requested item to finish fetching.
749 while self._pending:
750 digest = self._channel.pull()
751 self._pending.remove(digest)
752 self._fetched.add(digest)
753 if digest in digests:
754 return digest
755
756 # Should never reach this point due to assert above.
757 raise RuntimeError('Impossible state')
758
759 def inject_local_file(self, path, algo):
760 """Adds local file to the cache as if it was fetched from storage."""
761 with open(path, 'rb') as f:
762 data = f.read()
763 digest = algo(data).hexdigest()
764 self.cache.write(digest, [data])
765 self._fetched.add(digest)
766 return digest
767
768 @property
769 def pending_count(self):
770 """Returns number of items to be fetched."""
771 return len(self._pending)
772
773 def verify_all_cached(self):
774 """True if all accessed items are in cache."""
775 return self._accessed.issubset(self.cache.cached_set())
776
777
778class FetchStreamVerifier(object):
779 """Verifies that fetched file is valid before passing it to the LocalCache."""
780
781 def __init__(self, stream, expected_size):
782 self.stream = stream
783 self.expected_size = expected_size
784 self.current_size = 0
785
786 def run(self):
787 """Generator that yields same items as |stream|.
788
789 Verifies |stream| is complete before yielding a last chunk to consumer.
790
791 Also wraps IOError produced by consumer into MappingError exceptions since
792 otherwise Storage will retry fetch on unrelated local cache errors.
793 """
794 # Read one chunk ahead, keep it in |stored|.
795 # That way a complete stream can be verified before pushing last chunk
796 # to consumer.
797 stored = None
798 for chunk in self.stream:
799 assert chunk is not None
800 if stored is not None:
801 self._inspect_chunk(stored, is_last=False)
802 try:
803 yield stored
804 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400805 raise isolated_format.MappingError(
806 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000807 stored = chunk
808 if stored is not None:
809 self._inspect_chunk(stored, is_last=True)
810 try:
811 yield stored
812 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400813 raise isolated_format.MappingError(
814 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000815
816 def _inspect_chunk(self, chunk, is_last):
817 """Called for each fetched chunk before passing it to consumer."""
818 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400819 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700820 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000821 (self.expected_size != self.current_size)):
822 raise IOError('Incorrect file size: expected %d, got %d' % (
823 self.expected_size, self.current_size))
824
825
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000826class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800827 """Interface for classes that implement low-level storage operations.
828
829 StorageApi is oblivious of compression and hashing scheme used. This details
830 are handled in higher level Storage class.
831
832 Clients should generally not use StorageApi directly. Storage class is
833 preferred since it implements compression and upload optimizations.
834 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000835
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700836 @property
837 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500838 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700839 raise NotImplementedError()
840
841 @property
842 def namespace(self):
843 """Isolate namespace used by this storage.
844
845 Indirectly defines hashing scheme and compression method used.
846 """
847 raise NotImplementedError()
848
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000849 def get_fetch_url(self, digest):
850 """Returns an URL that can be used to fetch an item with given digest.
851
852 Arguments:
853 digest: hex digest of item to fetch.
854
855 Returns:
856 An URL or None if the protocol doesn't support this.
857 """
858 raise NotImplementedError()
859
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800860 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000861 """Fetches an object and yields its content.
862
863 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000864 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800865 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000866
867 Yields:
868 Chunks of downloaded item (as str objects).
869 """
870 raise NotImplementedError()
871
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800872 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000873 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000874
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875 |item| MUST go through 'contains' call to get |push_state| before it can
876 be pushed to the storage.
877
878 To be clear, here is one possible usage:
879 all_items = [... all items to push as Item subclasses ...]
880 for missing_item, push_state in storage_api.contains(all_items).items():
881 storage_api.push(missing_item, push_state)
882
883 When pushing to a namespace with compression, data that should be pushed
884 and data provided by the item is not the same. In that case |content| is
885 not None and it yields chunks of compressed data (using item.content() as
886 a source of original uncompressed data). This is implemented by Storage
887 class.
888
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000889 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000890 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800891 push_state: push state object as returned by 'contains' call.
892 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000893
894 Returns:
895 None.
896 """
897 raise NotImplementedError()
898
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000899 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800900 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000901
902 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800903 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000904
905 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800906 A dict missing Item -> opaque push state object to be passed to 'push'.
907 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000908 """
909 raise NotImplementedError()
910
911
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800912class _IsolateServerPushState(object):
913 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500914
915 Note this needs to be a global class to support pickling.
916 """
917
Cory Massarocc19c8c2015-03-10 13:35:11 -0700918 def __init__(self, preupload_status, size):
919 self.preupload_status = preupload_status
920 gs_upload_url = preupload_status.get('gs_upload_url') or None
921 if gs_upload_url:
922 self.upload_url = gs_upload_url
923 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
924 else:
925 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
926 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500927 self.uploaded = False
928 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500929 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500930
931
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000932class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000933 """StorageApi implementation that downloads and uploads to Isolate Server.
934
935 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800936 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000937 """
938
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000939 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000940 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500941 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700942 self._base_url = base_url.rstrip('/')
943 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700944 self._namespace_dict = {
945 'compression': 'flate' if namespace.endswith(
946 ('-gzip', '-flate')) else '',
947 'digest_hash': 'sha-1',
948 'namespace': namespace,
949 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000950 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000951 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500952 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000953
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000954 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000955 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700956 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000957
958 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700959 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000960 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000961 # TODO(maruel): Make this request much earlier asynchronously while the
962 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800963
964 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
965 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700966
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000967 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000968 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700969 self._server_caps = net.url_read_json(
970 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
971 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000972 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000973
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700974 @property
975 def location(self):
976 return self._base_url
977
978 @property
979 def namespace(self):
980 return self._namespace
981
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000982 def get_fetch_url(self, digest):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000983 assert isinstance(digest, basestring)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000984 return '%s/content-gs/retrieve/%s/%s' % (
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700985 self._base_url, self._namespace, digest)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000986
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800987 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700988 assert offset >= 0
989 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
990 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800991 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700992 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000993
Cory Massarocc19c8c2015-03-10 13:35:11 -0700994 if not response:
995 raise IOError('Attempted to fetch from %s; no data exist.' % source_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800996
Cory Massarocc19c8c2015-03-10 13:35:11 -0700997 # for DB uploads
998 content = response.get('content')
999 if content is not None:
1000 return base64.b64decode(content)
1001
1002 # for GS entities
1003 connection = net.url_open(response['url'])
1004
1005 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -08001006 if offset:
1007 content_range = connection.get_header('Content-Range')
1008 if not content_range:
1009 raise IOError('Missing Content-Range header')
1010
1011 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
1012 # According to a spec, <size> can be '*' meaning "Total size of the file
1013 # is not known in advance".
1014 try:
1015 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
1016 if not match:
1017 raise ValueError()
1018 content_offset = int(match.group(1))
1019 last_byte_index = int(match.group(2))
1020 size = None if match.group(3) == '*' else int(match.group(3))
1021 except ValueError:
1022 raise IOError('Invalid Content-Range header: %s' % content_range)
1023
1024 # Ensure returned offset equals requested one.
1025 if offset != content_offset:
1026 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1027 offset, content_offset, content_range))
1028
1029 # Ensure entire tail of the file is returned.
1030 if size is not None and last_byte_index + 1 != size:
1031 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1032
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001033 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001034
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001035 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001036 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001037 assert item.digest is not None
1038 assert item.size is not None
1039 assert isinstance(push_state, _IsolateServerPushState)
1040 assert not push_state.finalized
1041
1042 # Default to item.content().
1043 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001044 logging.info('Push state size: %d', push_state.size)
1045 if isinstance(content, (basestring, list)):
1046 # Memory is already used, too late.
1047 with self._lock:
1048 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001049 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001050 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1051 # If |content| is indeed a generator, it can not be re-winded back to the
1052 # beginning of the stream. A retry will find it exhausted. A possible
1053 # solution is to wrap |content| generator with some sort of caching
1054 # restartable generator. It should be done alongside streaming support
1055 # implementation.
1056 #
1057 # In theory, we should keep the generator, so that it is not serialized in
1058 # memory. Sadly net.HttpService.request() requires the body to be
1059 # serialized.
1060 assert isinstance(content, types.GeneratorType), repr(content)
1061 slept = False
1062 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001063 # One byte less than 512mb. This is to cope with incompressible content.
1064 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001065 while True:
1066 with self._lock:
1067 # This is due to 32 bits python when uploading very large files. The
1068 # problem is that it's comparing uncompressed sizes, while we care
1069 # about compressed sizes since it's what is serialized in memory.
1070 # The first check assumes large files are compressible and that by
1071 # throttling one upload at once, we can survive. Otherwise, kaboom.
1072 memory_use = self._memory_use
1073 if ((push_state.size >= max_size and not memory_use) or
1074 (memory_use + push_state.size <= max_size)):
1075 self._memory_use += push_state.size
1076 memory_use = self._memory_use
1077 break
1078 time.sleep(0.1)
1079 slept = True
1080 if slept:
1081 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001082
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001083 try:
1084 # This push operation may be a retry after failed finalization call below,
1085 # no need to reupload contents in that case.
1086 if not push_state.uploaded:
1087 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001088 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001089 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001090 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001091 item.digest, push_state.upload_url))
1092 push_state.uploaded = True
1093 else:
1094 logging.info(
1095 'A file %s already uploaded, retrying finalization only',
1096 item.digest)
1097
1098 # Optionally notify the server that it's done.
1099 if push_state.finalize_url:
1100 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1101 # send it to isolated server. That way isolate server can verify that
1102 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1103 # stored files).
1104 # TODO(maruel): Fix the server to accept properly data={} so
1105 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001106 response = net.url_read_json(
1107 url='%s/%s' % (self._base_url, push_state.finalize_url),
1108 data={
1109 'upload_ticket': push_state.preupload_status['upload_ticket'],
1110 })
1111 if not response or not response['ok']:
1112 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001113 push_state.finalized = True
1114 finally:
1115 with self._lock:
1116 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001117
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001118 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001119 # Ensure all items were initialized with 'prepare' call. Storage does that.
1120 assert all(i.digest is not None and i.size is not None for i in items)
1121
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001122 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001123 body = {
1124 'items': [
1125 {
1126 'digest': item.digest,
1127 'is_isolated': bool(item.high_priority),
1128 'size': item.size,
1129 } for item in items
1130 ],
1131 'namespace': self._namespace_dict,
1132 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001133
Cory Massarocc19c8c2015-03-10 13:35:11 -07001134 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001135
1136 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001137 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001138 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001139 response = net.url_read_json(url=query_url, data=body)
1140 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001141 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001142 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001143 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001144 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001145 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001146
1147 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001148 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001149 for preupload_status in response.get('items', []):
1150 assert 'upload_ticket' in preupload_status, (
1151 preupload_status, '/preupload did not generate an upload ticket')
1152 index = int(preupload_status['index'])
1153 missing_items[items[index]] = _IsolateServerPushState(
1154 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001155 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001156 len(items), len(items) - len(missing_items))
1157 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001158
Cory Massarocc19c8c2015-03-10 13:35:11 -07001159 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001160 """Fetches isolated data from the URL.
1161
1162 Used only for fetching files, not for API calls. Can be overridden in
1163 subclasses.
1164
1165 Args:
1166 url: URL to fetch the data from, can possibly return http redirect.
1167 offset: byte offset inside the file to start fetching from.
1168
1169 Returns:
1170 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1171 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001172 assert isinstance(offset, int)
1173 data = {
1174 'digest': digest.encode('utf-8'),
1175 'namespace': self._namespace_dict,
1176 'offset': offset,
1177 }
1178 return net.url_read_json(
1179 url=url,
1180 data=data,
1181 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001182
Cory Massarocc19c8c2015-03-10 13:35:11 -07001183 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001184 """Uploads isolated file to the URL.
1185
1186 Used only for storing files, not for API calls. Can be overridden in
1187 subclasses.
1188
1189 Args:
1190 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001191 push_state: an _IsolateServicePushState instance
1192 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001193 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001194 """
1195 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1196 # upload support is implemented.
1197 if isinstance(content, list) and len(content) == 1:
1198 content = content[0]
1199 else:
1200 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001201
1202 # DB upload
1203 if not push_state.finalize_url:
1204 url = '%s/%s' % (self._base_url, push_state.upload_url)
1205 content = base64.b64encode(content)
1206 data = {
1207 'upload_ticket': push_state.preupload_status['upload_ticket'],
1208 'content': content,
1209 }
1210 response = net.url_read_json(url=url, data=data)
1211 return response is not None and response['ok']
1212
1213 # upload to GS
1214 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001215 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001216 content_type='application/octet-stream',
1217 data=content,
1218 method='PUT',
1219 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001220 return response is not None
1221
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001222
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001223class LocalCache(object):
1224 """Local cache that stores objects fetched via Storage.
1225
1226 It can be accessed concurrently from multiple threads, so it should protect
1227 its internal state with some lock.
1228 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001229 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001230
1231 def __enter__(self):
1232 """Context manager interface."""
1233 return self
1234
1235 def __exit__(self, _exc_type, _exec_value, _traceback):
1236 """Context manager interface."""
1237 return False
1238
1239 def cached_set(self):
1240 """Returns a set of all cached digests (always a new object)."""
1241 raise NotImplementedError()
1242
1243 def touch(self, digest, size):
1244 """Ensures item is not corrupted and updates its LRU position.
1245
1246 Arguments:
1247 digest: hash digest of item to check.
1248 size: expected size of this item.
1249
1250 Returns:
1251 True if item is in cache and not corrupted.
1252 """
1253 raise NotImplementedError()
1254
1255 def evict(self, digest):
1256 """Removes item from cache if it's there."""
1257 raise NotImplementedError()
1258
1259 def read(self, digest):
1260 """Returns contents of the cached item as a single str."""
1261 raise NotImplementedError()
1262
1263 def write(self, digest, content):
1264 """Reads data from |content| generator and stores it in cache."""
1265 raise NotImplementedError()
1266
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001267 def hardlink(self, digest, dest, file_mode):
1268 """Ensures file at |dest| has same content as cached |digest|.
1269
1270 If file_mode is provided, it is used to set the executable bit if
1271 applicable.
1272 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001273 raise NotImplementedError()
1274
1275
1276class MemoryCache(LocalCache):
1277 """LocalCache implementation that stores everything in memory."""
1278
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001279 def __init__(self, file_mode_mask=0500):
1280 """Args:
1281 file_mode_mask: bit mask to AND file mode with. Default value will make
1282 all mapped files to be read only.
1283 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001284 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001285 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001286 # Let's not assume dict is thread safe.
1287 self._lock = threading.Lock()
1288 self._contents = {}
1289
1290 def cached_set(self):
1291 with self._lock:
1292 return set(self._contents)
1293
1294 def touch(self, digest, size):
1295 with self._lock:
1296 return digest in self._contents
1297
1298 def evict(self, digest):
1299 with self._lock:
1300 self._contents.pop(digest, None)
1301
1302 def read(self, digest):
1303 with self._lock:
1304 return self._contents[digest]
1305
1306 def write(self, digest, content):
1307 # Assemble whole stream before taking the lock.
1308 data = ''.join(content)
1309 with self._lock:
1310 self._contents[digest] = data
1311
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001312 def hardlink(self, digest, dest, file_mode):
1313 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001314 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001315 if file_mode is not None:
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001316 os.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001317
1318
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001319class CachePolicies(object):
1320 def __init__(self, max_cache_size, min_free_space, max_items):
1321 """
1322 Arguments:
1323 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1324 cache is effectively a leak.
1325 - min_free_space: Trim if disk free space becomes lower than this value. If
1326 0, it unconditionally fill the disk.
1327 - max_items: Maximum number of items to keep in the cache. If 0, do not
1328 enforce a limit.
1329 """
1330 self.max_cache_size = max_cache_size
1331 self.min_free_space = min_free_space
1332 self.max_items = max_items
1333
1334
1335class DiskCache(LocalCache):
1336 """Stateful LRU cache in a flat hash table in a directory.
1337
1338 Saves its state as json file.
1339 """
1340 STATE_FILE = 'state.json'
1341
1342 def __init__(self, cache_dir, policies, hash_algo):
1343 """
1344 Arguments:
1345 cache_dir: directory where to place the cache.
1346 policies: cache retention policies.
1347 algo: hashing algorithm used.
1348 """
1349 super(DiskCache, self).__init__()
1350 self.cache_dir = cache_dir
1351 self.policies = policies
1352 self.hash_algo = hash_algo
1353 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1354
1355 # All protected methods (starting with '_') except _path should be called
1356 # with this lock locked.
1357 self._lock = threading_utils.LockWithAssert()
1358 self._lru = lru.LRUDict()
1359
1360 # Profiling values.
1361 self._added = []
1362 self._removed = []
1363 self._free_disk = 0
1364
1365 with tools.Profiler('Setup'):
1366 with self._lock:
1367 self._load()
1368
1369 def __enter__(self):
1370 return self
1371
1372 def __exit__(self, _exc_type, _exec_value, _traceback):
1373 with tools.Profiler('CleanupTrimming'):
1374 with self._lock:
1375 self._trim()
1376
1377 logging.info(
1378 '%5d (%8dkb) added',
1379 len(self._added), sum(self._added) / 1024)
1380 logging.info(
1381 '%5d (%8dkb) current',
1382 len(self._lru),
1383 sum(self._lru.itervalues()) / 1024)
1384 logging.info(
1385 '%5d (%8dkb) removed',
1386 len(self._removed), sum(self._removed) / 1024)
1387 logging.info(
1388 ' %8dkb free',
1389 self._free_disk / 1024)
1390 return False
1391
1392 def cached_set(self):
1393 with self._lock:
1394 return self._lru.keys_set()
1395
1396 def touch(self, digest, size):
1397 """Verifies an actual file is valid.
1398
1399 Note that is doesn't compute the hash so it could still be corrupted if the
1400 file size didn't change.
1401
1402 TODO(maruel): More stringent verification while keeping the check fast.
1403 """
1404 # Do the check outside the lock.
1405 if not is_valid_file(self._path(digest), size):
1406 return False
1407
1408 # Update it's LRU position.
1409 with self._lock:
1410 if digest not in self._lru:
1411 return False
1412 self._lru.touch(digest)
1413 return True
1414
1415 def evict(self, digest):
1416 with self._lock:
1417 self._lru.pop(digest)
1418 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1419
1420 def read(self, digest):
1421 with open(self._path(digest), 'rb') as f:
1422 return f.read()
1423
1424 def write(self, digest, content):
1425 path = self._path(digest)
1426 # A stale broken file may remain. It is possible for the file to have write
1427 # access bit removed which would cause the file_write() call to fail to open
1428 # in write mode. Take no chance here.
1429 file_path.try_remove(path)
1430 try:
1431 size = file_write(path, content)
1432 except:
1433 # There are two possible places were an exception can occur:
1434 # 1) Inside |content| generator in case of network or unzipping errors.
1435 # 2) Inside file_write itself in case of disk IO errors.
1436 # In any case delete an incomplete file and propagate the exception to
1437 # caller, it will be logged there.
1438 file_path.try_remove(path)
1439 raise
1440 # Make the file read-only in the cache. This has a few side-effects since
1441 # the file node is modified, so every directory entries to this file becomes
1442 # read-only. It's fine here because it is a new file.
1443 file_path.set_read_only(path, True)
1444 with self._lock:
1445 self._add(digest, size)
1446
1447 def hardlink(self, digest, dest, file_mode):
1448 """Hardlinks the file to |dest|.
1449
1450 Note that the file permission bits are on the file node, not the directory
1451 entry, so changing the access bit on any of the directory entries for the
1452 file node will affect them all.
1453 """
1454 path = self._path(digest)
1455 # TODO(maruel): file_path.HARDLINK_WITH_FALLBACK ?
1456 file_path.hardlink(path, dest)
1457 if file_mode is not None:
1458 # Ignores all other bits.
1459 os.chmod(dest, file_mode & 0500)
1460
1461 def _load(self):
1462 """Loads state of the cache from json file."""
1463 self._lock.assert_locked()
1464
1465 if not os.path.isdir(self.cache_dir):
1466 os.makedirs(self.cache_dir)
1467 else:
1468 # Make sure the cache is read-only.
1469 # TODO(maruel): Calculate the cost and optimize the performance
1470 # accordingly.
1471 file_path.make_tree_read_only(self.cache_dir)
1472
1473 # Load state of the cache.
1474 if os.path.isfile(self.state_file):
1475 try:
1476 self._lru = lru.LRUDict.load(self.state_file)
1477 except ValueError as err:
1478 logging.error('Failed to load cache state: %s' % (err,))
1479 # Don't want to keep broken state file.
1480 file_path.try_remove(self.state_file)
1481
1482 # Ensure that all files listed in the state still exist and add new ones.
1483 previous = self._lru.keys_set()
1484 unknown = []
1485 for filename in os.listdir(self.cache_dir):
1486 if filename == self.STATE_FILE:
1487 continue
1488 if filename in previous:
1489 previous.remove(filename)
1490 continue
1491 # An untracked file.
1492 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1493 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001494 p = self._path(filename)
1495 if os.path.isdir(p):
1496 try:
1497 file_path.rmtree(p)
1498 except OSError:
1499 pass
1500 else:
1501 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001502 continue
1503 # File that's not referenced in 'state.json'.
1504 # TODO(vadimsh): Verify its SHA1 matches file name.
1505 logging.warning('Adding unknown file %s to cache', filename)
1506 unknown.append(filename)
1507
1508 if unknown:
1509 # Add as oldest files. They will be deleted eventually if not accessed.
1510 self._add_oldest_list(unknown)
1511 logging.warning('Added back %d unknown files', len(unknown))
1512
1513 if previous:
1514 # Filter out entries that were not found.
1515 logging.warning('Removed %d lost files', len(previous))
1516 for filename in previous:
1517 self._lru.pop(filename)
1518 self._trim()
1519
1520 def _save(self):
1521 """Saves the LRU ordering."""
1522 self._lock.assert_locked()
1523 if sys.platform != 'win32':
1524 d = os.path.dirname(self.state_file)
1525 if os.path.isdir(d):
1526 # Necessary otherwise the file can't be created.
1527 file_path.set_read_only(d, False)
1528 if os.path.isfile(self.state_file):
1529 file_path.set_read_only(self.state_file, False)
1530 self._lru.save(self.state_file)
1531
1532 def _trim(self):
1533 """Trims anything we don't know, make sure enough free space exists."""
1534 self._lock.assert_locked()
1535
1536 # Ensure maximum cache size.
1537 if self.policies.max_cache_size:
1538 total_size = sum(self._lru.itervalues())
1539 while total_size > self.policies.max_cache_size:
1540 total_size -= self._remove_lru_file()
1541
1542 # Ensure maximum number of items in the cache.
1543 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1544 for _ in xrange(len(self._lru) - self.policies.max_items):
1545 self._remove_lru_file()
1546
1547 # Ensure enough free space.
1548 self._free_disk = file_path.get_free_space(self.cache_dir)
1549 trimmed_due_to_space = False
1550 while (
1551 self.policies.min_free_space and
1552 self._lru and
1553 self._free_disk < self.policies.min_free_space):
1554 trimmed_due_to_space = True
1555 self._remove_lru_file()
1556 self._free_disk = file_path.get_free_space(self.cache_dir)
1557 if trimmed_due_to_space:
1558 total_usage = sum(self._lru.itervalues())
1559 usage_percent = 0.
1560 if total_usage:
1561 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1562 logging.warning(
1563 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1564 'cache (%.1f%% of its maximum capacity)',
1565 self._free_disk / 1024.,
1566 total_usage / 1024.,
1567 usage_percent)
1568 self._save()
1569
1570 def _path(self, digest):
1571 """Returns the path to one item."""
1572 return os.path.join(self.cache_dir, digest)
1573
1574 def _remove_lru_file(self):
1575 """Removes the last recently used file and returns its size."""
1576 self._lock.assert_locked()
1577 digest, size = self._lru.pop_oldest()
1578 self._delete_file(digest, size)
1579 return size
1580
1581 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1582 """Adds an item into LRU cache marking it as a newest one."""
1583 self._lock.assert_locked()
1584 if size == UNKNOWN_FILE_SIZE:
1585 size = os.stat(self._path(digest)).st_size
1586 self._added.append(size)
1587 self._lru.add(digest, size)
1588
1589 def _add_oldest_list(self, digests):
1590 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1591 self._lock.assert_locked()
1592 pairs = []
1593 for digest in digests:
1594 size = os.stat(self._path(digest)).st_size
1595 self._added.append(size)
1596 pairs.append((digest, size))
1597 self._lru.batch_insert_oldest(pairs)
1598
1599 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1600 """Deletes cache file from the file system."""
1601 self._lock.assert_locked()
1602 try:
1603 if size == UNKNOWN_FILE_SIZE:
1604 size = os.stat(self._path(digest)).st_size
1605 file_path.try_remove(self._path(digest))
1606 self._removed.append(size)
1607 except OSError as e:
1608 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1609
1610
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001611class IsolatedBundle(object):
1612 """Fetched and parsed .isolated file with all dependencies."""
1613
Vadim Shtayura3148e072014-09-02 18:51:52 -07001614 def __init__(self):
1615 self.command = []
1616 self.files = {}
1617 self.read_only = None
1618 self.relative_cwd = None
1619 # The main .isolated file, a IsolatedFile instance.
1620 self.root = None
1621
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001622 def fetch(self, fetch_queue, root_isolated_hash, algo):
1623 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001624
1625 It enables support for "included" .isolated files. They are processed in
1626 strict order but fetched asynchronously from the cache. This is important so
1627 that a file in an included .isolated file that is overridden by an embedding
1628 .isolated file is not fetched needlessly. The includes are fetched in one
1629 pass and the files are fetched as soon as all the ones on the left-side
1630 of the tree were fetched.
1631
1632 The prioritization is very important here for nested .isolated files.
1633 'includes' have the highest priority and the algorithm is optimized for both
1634 deep and wide trees. A deep one is a long link of .isolated files referenced
1635 one at a time by one item in 'includes'. A wide one has a large number of
1636 'includes' in a single .isolated file. 'left' is defined as an included
1637 .isolated file earlier in the 'includes' list. So the order of the elements
1638 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001639
1640 As a side effect this method starts asynchronous fetch of all data files
1641 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1642 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001643 """
1644 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1645
1646 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1647 pending = {}
1648 # Set of hashes of already retrieved items to refuse recursive includes.
1649 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001650 # Set of IsolatedFile's whose data files have already being fetched.
1651 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001652
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001653 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654 h = isolated_file.obj_hash
1655 if h in seen:
1656 raise isolated_format.IsolatedError(
1657 'IsolatedFile %s is retrieved recursively' % h)
1658 assert h not in pending
1659 seen.add(h)
1660 pending[h] = isolated_file
1661 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1662
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001663 # Start fetching root *.isolated file (single file, not the whole bundle).
1664 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001665
1666 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001667 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001668 item_hash = fetch_queue.wait(pending)
1669 item = pending.pop(item_hash)
1670 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001671
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001672 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001673 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001674 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001675
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001676 # Always fetch *.isolated files in traversal order, waiting if necessary
1677 # until next to-be-processed node loads. "Waiting" is done by yielding
1678 # back to the outer loop, that waits until some *.isolated is loaded.
1679 for node in isolated_format.walk_includes(self.root):
1680 if node not in processed:
1681 # Not visited, and not yet loaded -> wait for it to load.
1682 if not node.is_loaded:
1683 break
1684 # Not visited and loaded -> process it and continue the traversal.
1685 self._start_fetching_files(node, fetch_queue)
1686 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001687
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001688 # All *.isolated files should be processed by now and only them.
1689 all_isolateds = set(isolated_format.walk_includes(self.root))
1690 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001691
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001692 # Extract 'command' and other bundle properties.
1693 for node in isolated_format.walk_includes(self.root):
1694 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001695 self.relative_cwd = self.relative_cwd or ''
1696
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001697 def _start_fetching_files(self, isolated, fetch_queue):
1698 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001699
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001700 Modifies self.files.
1701 """
1702 logging.debug('fetch_files(%s)', isolated.obj_hash)
1703 for filepath, properties in isolated.data.get('files', {}).iteritems():
1704 # Root isolated has priority on the files being mapped. In particular,
1705 # overridden files must not be fetched.
1706 if filepath not in self.files:
1707 self.files[filepath] = properties
1708 if 'h' in properties:
1709 # Preemptively request files.
1710 logging.debug('fetching %s', filepath)
1711 fetch_queue.add(
1712 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1713
1714 def _update_self(self, node):
1715 """Extracts bundle global parameters from loaded *.isolated file.
1716
1717 Will be called with each loaded *.isolated file in order of traversal of
1718 isolated include graph (see isolated_format.walk_includes).
1719 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001720 # Grabs properties.
1721 if not self.command and node.data.get('command'):
1722 # Ensure paths are correctly separated on windows.
1723 self.command = node.data['command']
1724 if self.command:
1725 self.command[0] = self.command[0].replace('/', os.path.sep)
1726 self.command = tools.fix_python_path(self.command)
1727 if self.read_only is None and node.data.get('read_only') is not None:
1728 self.read_only = node.data['read_only']
1729 if (self.relative_cwd is None and
1730 node.data.get('relative_cwd') is not None):
1731 self.relative_cwd = node.data['relative_cwd']
1732
1733
Vadim Shtayura8623c272014-12-01 11:45:27 -08001734def set_storage_api_class(cls):
1735 """Replaces StorageApi implementation used by default."""
1736 global _storage_api_cls
1737 assert _storage_api_cls is None
1738 assert issubclass(cls, StorageApi)
1739 _storage_api_cls = cls
1740
1741
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001742def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001743 """Returns an object that implements low-level StorageApi interface.
1744
1745 It is used by Storage to work with single isolate |namespace|. It should
1746 rarely be used directly by clients, see 'get_storage' for
1747 a better alternative.
1748
1749 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001750 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001751 namespace: isolate namespace to operate in, also defines hashing and
1752 compression scheme used, i.e. namespace names that end with '-gzip'
1753 store compressed data.
1754
1755 Returns:
1756 Instance of StorageApi subclass.
1757 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001758 cls = _storage_api_cls or IsolateServer
1759 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001760
1761
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001762def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001763 """Returns Storage class that can upload and download from |namespace|.
1764
1765 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001766 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001767 namespace: isolate namespace to operate in, also defines hashing and
1768 compression scheme used, i.e. namespace names that end with '-gzip'
1769 store compressed data.
1770
1771 Returns:
1772 Instance of Storage.
1773 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001774 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001775
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001776
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001777def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001778 """Uploads the given tree to the given url.
1779
1780 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001781 base_url: The url of the isolate server to upload to.
1782 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001783 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001784 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001785 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001786 # Filter out symlinks, since they are not represented by items on isolate
1787 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001788 items = []
1789 seen = set()
1790 skipped = 0
1791 for filepath, metadata in infiles:
1792 if 'l' not in metadata and filepath not in seen:
1793 seen.add(filepath)
1794 item = FileItem(
1795 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001796 digest=metadata['h'],
1797 size=metadata['s'],
1798 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001799 items.append(item)
1800 else:
1801 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001802
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001803 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001804 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001805 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001806
1807
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001808def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001809 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001810
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001811 Arguments:
1812 isolated_hash: hash of the root *.isolated file.
1813 storage: Storage class that communicates with isolate storage.
1814 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001815 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 require_command: Ensure *.isolated specifies a command to run.
1817
1818 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001819 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001820 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001821 logging.debug(
1822 'fetch_isolated(%s, %s, %s, %s, %s)',
1823 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001824 # Hash algorithm to use, defined by namespace |storage| is using.
1825 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001826 with cache:
1827 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001828 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001829
1830 with tools.Profiler('GetIsolateds'):
1831 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001832 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001833 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
1834 try:
1835 isolated_hash = fetch_queue.inject_local_file(isolated_hash, algo)
1836 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001837 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001838 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1839 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001840
1841 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001842 bundle.fetch(fetch_queue, isolated_hash, algo)
1843 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001844 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1845 # easy way to cancel them.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001846 raise isolated_format.IsolatedError('No command to run')
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001847
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001848 with tools.Profiler('GetRest'):
1849 # Create file system hierarchy.
1850 if not os.path.isdir(outdir):
1851 os.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001852 create_directories(outdir, bundle.files)
1853 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001854
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001855 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001856 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001857 if not os.path.isdir(cwd):
1858 os.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001859
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001860 # Multimap: digest -> list of pairs (path, props).
1861 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001862 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001863 if 'h' in props:
1864 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001865
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001866 # Now block on the remaining files to be downloaded and mapped.
1867 logging.info('Retrieving remaining files (%d of them)...',
1868 fetch_queue.pending_count)
1869 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001870 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001871 while remaining:
1872 detector.ping()
1873
1874 # Wait for any item to finish fetching to cache.
1875 digest = fetch_queue.wait(remaining)
1876
1877 # Link corresponding files to a fetched item in cache.
1878 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001879 cache.hardlink(
1880 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001881
1882 # Report progress.
1883 duration = time.time() - last_update
1884 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1885 msg = '%d files remaining...' % len(remaining)
1886 print msg
1887 logging.info(msg)
1888 last_update = time.time()
1889
1890 # Cache could evict some items we just tried to fetch, it's a fatal error.
1891 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001892 raise isolated_format.MappingError(
1893 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001894 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001895
1896
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001897def directory_to_metadata(root, algo, blacklist):
1898 """Returns the FileItem list and .isolated metadata for a directory."""
1899 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001900 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001901 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001902 metadata = {
1903 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001904 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001905 for relpath in paths
1906 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001907 for v in metadata.itervalues():
1908 v.pop('t')
1909 items = [
1910 FileItem(
1911 path=os.path.join(root, relpath),
1912 digest=meta['h'],
1913 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001914 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001915 for relpath, meta in metadata.iteritems() if 'h' in meta
1916 ]
1917 return items, metadata
1918
1919
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001920def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001921 """Stores every entries and returns the relevant data.
1922
1923 Arguments:
1924 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001925 files: list of file paths to upload. If a directory is specified, a
1926 .isolated file is created and its hash is returned.
1927 blacklist: function that returns True if a file should be omitted.
1928 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001929 assert all(isinstance(i, unicode) for i in files), files
1930 if len(files) != len(set(map(os.path.abspath, files))):
1931 raise Error('Duplicate entries found.')
1932
1933 results = []
1934 # The temporary directory is only created as needed.
1935 tempdir = None
1936 try:
1937 # TODO(maruel): Yield the files to a worker thread.
1938 items_to_upload = []
1939 for f in files:
1940 try:
1941 filepath = os.path.abspath(f)
1942 if os.path.isdir(filepath):
1943 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001944 items, metadata = directory_to_metadata(
1945 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001946
1947 # Create the .isolated file.
1948 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001949 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1950 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001951 os.close(handle)
1952 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001953 'algo':
1954 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001955 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001956 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001957 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001958 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001959 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001960 items_to_upload.extend(items)
1961 items_to_upload.append(
1962 FileItem(
1963 path=isolated,
1964 digest=h,
1965 size=os.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001966 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001967 results.append((h, f))
1968
1969 elif os.path.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001970 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001971 items_to_upload.append(
1972 FileItem(
1973 path=filepath,
1974 digest=h,
1975 size=os.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001976 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001977 results.append((h, f))
1978 else:
1979 raise Error('%s is neither a file or directory.' % f)
1980 except OSError:
1981 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001982 # Technically we would care about which files were uploaded but we don't
1983 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001984 _uploaded_files = storage.upload_items(items_to_upload)
1985 return results
1986 finally:
Marc-Antoine Ruel1b7bfec2015-02-11 15:35:42 -05001987 if tempdir and os.path.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001988 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001989
1990
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001991def archive(out, namespace, files, blacklist):
1992 if files == ['-']:
1993 files = sys.stdin.readlines()
1994
1995 if not files:
1996 raise Error('Nothing to upload')
1997
1998 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001999 blacklist = tools.gen_blacklist(blacklist)
2000 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002001 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002002 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2003
2004
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002005@subcommand.usage('<file1..fileN> or - to read from stdin')
2006def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002007 """Archives data to the server.
2008
2009 If a directory is specified, a .isolated file is created the whole directory
2010 is uploaded. Then this .isolated file can be included in another one to run
2011 commands.
2012
2013 The commands output each file that was processed with its content hash. For
2014 directories, the .isolated generated for the directory is listed as the
2015 directory entry itself.
2016 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002017 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002018 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002019 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002020 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002021 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002022 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002023 except Error as e:
2024 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002025 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002026
2027
2028def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002029 """Download data from the server.
2030
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002031 It can either download individual files or a complete tree from a .isolated
2032 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002033 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002034 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002035 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002036 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002037 help='hash of an isolated file, .isolated file content is discarded, use '
2038 '--file if you need it')
2039 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002040 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2041 help='hash and destination of a file, can be used multiple times')
2042 parser.add_option(
2043 '-t', '--target', metavar='DIR', default=os.getcwd(),
2044 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002045 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002046 options, args = parser.parse_args(args)
2047 if args:
2048 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002049
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002050 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002051 if bool(options.isolated) == bool(options.file):
2052 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002053
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002054 cache = process_cache_options(options)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002055 options.target = os.path.abspath(options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002056 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002057 # Fetching individual files.
2058 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002059 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002060 channel = threading_utils.TaskChannel()
2061 pending = {}
2062 for digest, dest in options.file:
2063 pending[digest] = dest
2064 storage.async_fetch(
2065 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002066 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002067 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002068 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002069 functools.partial(file_write, os.path.join(options.target, dest)))
2070 while pending:
2071 fetched = channel.pull()
2072 dest = pending.pop(fetched)
2073 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002074
Vadim Shtayura3172be52013-12-03 12:49:05 -08002075 # Fetching whole isolated tree.
2076 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002077 with cache:
2078 bundle = fetch_isolated(
2079 isolated_hash=options.isolated,
2080 storage=storage,
2081 cache=cache,
2082 outdir=options.target,
2083 require_command=False)
2084 if bundle.command:
2085 rel = os.path.join(options.target, bundle.relative_cwd)
2086 print('To run this test please run from the directory %s:' %
2087 os.path.join(options.target, rel))
2088 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002089
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002090 return 0
2091
2092
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002093def add_archive_options(parser):
2094 parser.add_option(
2095 '--blacklist',
2096 action='append', default=list(DEFAULT_BLACKLIST),
2097 help='List of regexp to use as blacklist filter when uploading '
2098 'directories')
2099
2100
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002101def add_isolate_server_options(parser):
2102 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002103 parser.add_option(
2104 '-I', '--isolate-server',
2105 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002106 help='URL of the Isolate Server to use. Defaults to the environment '
2107 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2108 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002109 parser.add_option(
2110 '--namespace', default='default-gzip',
2111 help='The namespace to use on the Isolate Server, default: %default')
2112
2113
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002114def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002115 """Processes the --isolate-server option and aborts if not specified.
2116
2117 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002118 """
2119 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002120 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002121 try:
2122 options.isolate_server = net.fix_url(options.isolate_server)
2123 except ValueError as e:
2124 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002125 if set_exception_handler:
2126 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002127 try:
2128 return auth.ensure_logged_in(options.isolate_server)
2129 except ValueError as e:
2130 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002131
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002132
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002133def add_cache_options(parser):
2134 cache_group = optparse.OptionGroup(parser, 'Cache management')
2135 cache_group.add_option(
2136 '--cache', metavar='DIR',
2137 help='Directory to keep a local cache of the files. Accelerates download '
2138 'by reusing already downloaded files. Default=%default')
2139 cache_group.add_option(
2140 '--max-cache-size',
2141 type='int',
2142 metavar='NNN',
2143 default=20*1024*1024*1024,
2144 help='Trim if the cache gets larger than this value, default=%default')
2145 cache_group.add_option(
2146 '--min-free-space',
2147 type='int',
2148 metavar='NNN',
2149 default=2*1024*1024*1024,
2150 help='Trim if disk free space becomes lower than this value, '
2151 'default=%default')
2152 cache_group.add_option(
2153 '--max-items',
2154 type='int',
2155 metavar='NNN',
2156 default=100000,
2157 help='Trim if more than this number of items are in the cache '
2158 'default=%default')
2159 parser.add_option_group(cache_group)
2160
2161
2162def process_cache_options(options):
2163 if options.cache:
2164 policies = CachePolicies(
2165 options.max_cache_size, options.min_free_space, options.max_items)
2166
2167 # |options.cache| path may not exist until DiskCache() instance is created.
2168 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002169 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002170 policies,
2171 isolated_format.get_hash_algo(options.namespace))
2172 else:
2173 return MemoryCache()
2174
2175
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002176class OptionParserIsolateServer(tools.OptionParserWithLogging):
2177 def __init__(self, **kwargs):
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002178 tools.OptionParserWithLogging.__init__(
2179 self,
2180 version=__version__,
2181 prog=os.path.basename(sys.modules[__name__].__file__),
2182 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002183 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002184
2185 def parse_args(self, *args, **kwargs):
2186 options, args = tools.OptionParserWithLogging.parse_args(
2187 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002188 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002189 return options, args
2190
2191
2192def main(args):
2193 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002194 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002195
2196
2197if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002198 fix_encoding.fix_encoding()
2199 tools.disable_buffering()
2200 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002201 sys.exit(main(sys.argv[1:]))