blob: 2ffd7dcce61aba64d07706dd4626d2def8159dc0 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
nodir55be77b2016-05-03 09:39:57 -07008__version__ = '0.4.8'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080042
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000043
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000044# Version of isolate protocol passed to the server in /handshake request.
45ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000046
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000047
Vadim Shtayura3148e072014-09-02 18:51:52 -070048# The file size to be used when we don't know the correct file size,
49# generally used for .isolated files.
50UNKNOWN_FILE_SIZE = None
51
52
53# Maximum expected delay (in seconds) between successive file fetches or uploads
54# in Storage. If it takes longer than that, a deadlock might be happening
55# and all stack frames for all threads are dumped to log.
56DEADLOCK_TIMEOUT = 5 * 60
57
58
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000059# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000060# All files are sorted by likelihood of a change in the file content
61# (currently file size is used to estimate this: larger the file -> larger the
62# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000063# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000064# and so on. Numbers here is a trade-off; the more per request, the lower the
65# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
66# larger values cause longer lookups, increasing the initial latency to start
67# uploading, which is especially an issue for large files. This value is
68# optimized for the "few thousands files to look up with minimal number of large
69# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040070ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000071
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000072
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000073# A list of already compressed extension types that should not receive any
74# compression before being uploaded.
75ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040076 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
77 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000078]
79
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000080
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000081# Chunk size to use when reading from network stream.
82NET_IO_FILE_CHUNK = 16 * 1024
83
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000084
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000085# Read timeout in seconds for downloads from isolate storage. If there's no
86# response from the server within this timeout whole download will be aborted.
87DOWNLOAD_READ_TIMEOUT = 60
88
89
maruel@chromium.org41601642013-09-18 19:40:46 +000090# The delay (in seconds) to wait between logging statements when retrieving
91# the required files. This is intended to let the user (or buildbot) know that
92# the program is still running.
93DELAY_BETWEEN_UPDATES_IN_SECS = 30
94
95
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050096DEFAULT_BLACKLIST = (
97 # Temporary vim or python files.
98 r'^.+\.(?:pyc|swp)$',
99 # .git or .svn directory.
100 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
101)
102
103
Vadim Shtayura8623c272014-12-01 11:45:27 -0800104# A class to use to communicate with the server by default. Can be changed by
105# 'set_storage_api_class'. Default is IsolateServer.
106_storage_api_cls = None
107
108
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500109class Error(Exception):
110 """Generic runtime error."""
111 pass
112
113
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400114class Aborted(Error):
115 """Operation aborted."""
116 pass
117
118
maruel12e30012015-10-09 11:55:35 -0700119def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800120 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700121 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800122 if offset:
123 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000124 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000125 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000126 if not data:
127 break
128 yield data
129
130
maruel12e30012015-10-09 11:55:35 -0700131def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000132 """Writes file content as generated by content_generator.
133
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000134 Creates the intermediary directory as needed.
135
136 Returns the number of bytes written.
137
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138 Meant to be mocked out in unit tests.
139 """
nodire5028a92016-04-29 14:38:21 -0700140 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000141 total = 0
maruel12e30012015-10-09 11:55:35 -0700142 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000143 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000144 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000145 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000146 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000147
148
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000149def zip_compress(content_generator, level=7):
150 """Reads chunks from |content_generator| and yields zip compressed chunks."""
151 compressor = zlib.compressobj(level)
152 for chunk in content_generator:
153 compressed = compressor.compress(chunk)
154 if compressed:
155 yield compressed
156 tail = compressor.flush(zlib.Z_FINISH)
157 if tail:
158 yield tail
159
160
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400161def zip_decompress(
162 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000163 """Reads zipped data from |content_generator| and yields decompressed data.
164
165 Decompresses data in small chunks (no larger than |chunk_size|) so that
166 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
167
168 Raises IOError if data is corrupted or incomplete.
169 """
170 decompressor = zlib.decompressobj()
171 compressed_size = 0
172 try:
173 for chunk in content_generator:
174 compressed_size += len(chunk)
175 data = decompressor.decompress(chunk, chunk_size)
176 if data:
177 yield data
178 while decompressor.unconsumed_tail:
179 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
180 if data:
181 yield data
182 tail = decompressor.flush()
183 if tail:
184 yield tail
185 except zlib.error as e:
186 raise IOError(
187 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
188 # Ensure all data was read and decompressed.
189 if decompressor.unused_data or decompressor.unconsumed_tail:
190 raise IOError('Not all data was decompressed')
191
192
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000193def get_zip_compression_level(filename):
194 """Given a filename calculates the ideal zip compression level to use."""
195 file_ext = os.path.splitext(filename)[1].lower()
196 # TODO(csharp): Profile to find what compression level works best.
197 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
198
199
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000200def create_directories(base_directory, files):
201 """Creates the directory structure needed by the given list of files."""
202 logging.debug('create_directories(%s, %d)', base_directory, len(files))
203 # Creates the tree of directories to create.
204 directories = set(os.path.dirname(f) for f in files)
205 for item in list(directories):
206 while item:
207 directories.add(item)
208 item = os.path.dirname(item)
209 for d in sorted(directories):
210 if d:
maruel12e30012015-10-09 11:55:35 -0700211 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000212
213
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500214def create_symlinks(base_directory, files):
215 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000216 for filepath, properties in files:
217 if 'l' not in properties:
218 continue
219 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500220 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000221 logging.warning('Ignoring symlink %s', filepath)
222 continue
223 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500224 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000225 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000226
227
maruel12e30012015-10-09 11:55:35 -0700228def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000229 """Determines if the given files appears valid.
230
231 Currently it just checks the file's size.
232 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700233 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700234 return fs.isfile(path)
235 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 if size != actual_size:
237 logging.warning(
238 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700239 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000240 return False
241 return True
242
243
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000244class Item(object):
245 """An item to push to Storage.
246
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800247 Its digest and size may be provided in advance, if known. Otherwise they will
248 be derived from content(). If digest is provided, it MUST correspond to
249 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000250
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800251 When used with Storage, Item starts its life in a main thread, travels
252 to 'contains' thread, then to 'push' thread and then finally back to
253 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000254 """
255
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800256 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000257 self.digest = digest
258 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800259 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000260 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800262 def content(self):
263 """Iterable with content of this item as byte string (str) chunks."""
264 raise NotImplementedError()
265
266 def prepare(self, hash_algo):
267 """Ensures self.digest and self.size are set.
268
269 Uses content() as a source of data to calculate them. Does nothing if digest
270 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000271
272 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800273 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000274 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800275 if self.digest is None or self.size is None:
276 digest = hash_algo()
277 total = 0
278 for chunk in self.content():
279 digest.update(chunk)
280 total += len(chunk)
281 self.digest = digest.hexdigest()
282 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000283
284
285class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800286 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000287
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800288 Its digest and size may be provided in advance, if known. Otherwise they will
289 be derived from the file content.
290 """
291
292 def __init__(self, path, digest=None, size=None, high_priority=False):
293 super(FileItem, self).__init__(
294 digest,
maruel12e30012015-10-09 11:55:35 -0700295 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800296 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000297 self.path = path
298 self.compression_level = get_zip_compression_level(path)
299
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800300 def content(self):
301 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000302
303
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000304class BufferItem(Item):
305 """A byte buffer to push to Storage."""
306
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800307 def __init__(self, buf, high_priority=False):
308 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000309 self.buffer = buf
310
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800311 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000312 return [self.buffer]
313
314
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000315class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800316 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000317
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800318 Implements compression support, parallel 'contains' checks, parallel uploads
319 and more.
320
321 Works only within single namespace (and thus hashing algorithm and compression
322 scheme are fixed).
323
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400324 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
325 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 """
327
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700328 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000329 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400330 self._use_zip = isolated_format.is_namespace_with_compression(
331 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400332 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000333 self._cpu_thread_pool = None
334 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400335 self._aborted = False
336 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000337
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000338 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700339 def hash_algo(self):
340 """Hashing algorithm used to name files in storage based on their content.
341
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400342 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700343 """
344 return self._hash_algo
345
346 @property
347 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500348 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700349 return self._storage_api.location
350
351 @property
352 def namespace(self):
353 """Isolate namespace used by this storage.
354
355 Indirectly defines hashing scheme and compression method used.
356 """
357 return self._storage_api.namespace
358
359 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000360 def cpu_thread_pool(self):
361 """ThreadPool for CPU-bound tasks like zipping."""
362 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500363 threads = max(threading_utils.num_processors(), 2)
364 if sys.maxsize <= 2L**32:
365 # On 32 bits userland, do not try to use more than 16 threads.
366 threads = min(threads, 16)
367 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000368 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000369
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000370 @property
371 def net_thread_pool(self):
372 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
373 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700374 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000375 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000376
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377 def close(self):
378 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400379 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000380 if self._cpu_thread_pool:
381 self._cpu_thread_pool.join()
382 self._cpu_thread_pool.close()
383 self._cpu_thread_pool = None
384 if self._net_thread_pool:
385 self._net_thread_pool.join()
386 self._net_thread_pool.close()
387 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400388 logging.info('Done.')
389
390 def abort(self):
391 """Cancels any pending or future operations."""
392 # This is not strictly theadsafe, but in the worst case the logging message
393 # will be printed twice. Not a big deal. In other places it is assumed that
394 # unprotected reads and writes to _aborted are serializable (it is true
395 # for python) and thus no locking is used.
396 if not self._aborted:
397 logging.warning('Aborting... It can take a while.')
398 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000400 def __enter__(self):
401 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400402 assert not self._prev_sig_handlers, self._prev_sig_handlers
403 for s in (signal.SIGINT, signal.SIGTERM):
404 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000405 return self
406
407 def __exit__(self, _exc_type, _exc_value, _traceback):
408 """Context manager interface."""
409 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400410 while self._prev_sig_handlers:
411 s, h = self._prev_sig_handlers.popitem()
412 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000413 return False
414
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000415 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800416 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000417
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800418 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000419
420 Arguments:
421 items: list of Item instances that represents data to upload.
422
423 Returns:
424 List of items that were uploaded. All other items are already there.
425 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700426 logging.info('upload_items(items=%d)', len(items))
427
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800428 # Ensure all digests are calculated.
429 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700430 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800431
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000432 # For each digest keep only first Item that matches it. All other items
433 # are just indistinguishable copies from the point of view of isolate
434 # server (it doesn't care about paths at all, only content and digests).
435 seen = {}
436 duplicates = 0
437 for item in items:
438 if seen.setdefault(item.digest, item) is not item:
439 duplicates += 1
440 items = seen.values()
441 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700442 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000443
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000444 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000445 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000446 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800447 channel = threading_utils.TaskChannel()
448 for missing_item, push_state in self.get_missing_items(items):
449 missing.add(missing_item)
450 self.async_push(channel, missing_item, push_state)
451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 # No need to spawn deadlock detector thread if there's nothing to upload.
453 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700454 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000455 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000456 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 detector.ping()
458 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000459 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000460 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000461 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 logging.info('All files are uploaded')
463
464 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000465 total = len(items)
466 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000467 logging.info(
468 'Total: %6d, %9.1fkb',
469 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000470 total_size / 1024.)
471 cache_hit = set(items) - missing
472 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000473 logging.info(
474 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
475 len(cache_hit),
476 cache_hit_size / 1024.,
477 len(cache_hit) * 100. / total,
478 cache_hit_size * 100. / total_size if total_size else 0)
479 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000480 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 logging.info(
482 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
483 len(cache_miss),
484 cache_miss_size / 1024.,
485 len(cache_miss) * 100. / total,
486 cache_miss_size * 100. / total_size if total_size else 0)
487
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000488 return uploaded
489
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800490 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000491 """Starts asynchronous push to the server in a parallel thread.
492
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800493 Can be used only after |item| was checked for presence on a server with
494 'get_missing_items' call. 'get_missing_items' returns |push_state| object
495 that contains storage specific information describing how to upload
496 the item (for example in case of cloud storage, it is signed upload URLs).
497
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000498 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000499 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000500 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800501 push_state: push state returned by 'get_missing_items' call for |item|.
502
503 Returns:
504 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000505 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800506 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400507 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700508 threading_utils.PRIORITY_HIGH if item.high_priority
509 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000511 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400512 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400513 if self._aborted:
514 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700515 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000517 return item
518
519 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700520 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800521 self.net_thread_pool.add_task_with_channel(
522 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000523 return
524
525 # If zipping is enabled, zip in a separate thread.
526 def zip_and_push():
527 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
528 # content right here. It will block until all file is zipped.
529 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400530 if self._aborted:
531 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800532 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 data = ''.join(stream)
534 except Exception as exc:
535 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800536 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000538 self.net_thread_pool.add_task_with_channel(
539 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000540 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000541
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800542 def push(self, item, push_state):
543 """Synchronously pushes a single item to the server.
544
545 If you need to push many items at once, consider using 'upload_items' or
546 'async_push' with instance of TaskChannel.
547
548 Arguments:
549 item: item to upload as instance of Item class.
550 push_state: push state returned by 'get_missing_items' call for |item|.
551
552 Returns:
553 Pushed item (same object as |item|).
554 """
555 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700556 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800557 self.async_push(channel, item, push_state)
558 pushed = channel.pull()
559 assert pushed is item
560 return item
561
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000562 def async_fetch(self, channel, priority, digest, size, sink):
563 """Starts asynchronous fetch from the server in a parallel thread.
564
565 Arguments:
566 channel: TaskChannel that receives back |digest| when download ends.
567 priority: thread pool task priority for the fetch.
568 digest: hex digest of an item to download.
569 size: expected size of the item (after decompression).
570 sink: function that will be called as sink(generator).
571 """
572 def fetch():
573 try:
574 # Prepare reading pipeline.
575 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700576 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400577 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000578 # Run |stream| through verifier that will assert its size.
579 verifier = FetchStreamVerifier(stream, size)
580 # Verified stream goes to |sink|.
581 sink(verifier.run())
582 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800583 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000584 raise
585 return digest
586
587 # Don't bother with zip_thread_pool for decompression. Decompression is
588 # really fast and most probably IO bound anyway.
589 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
590
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000591 def get_missing_items(self, items):
592 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000593
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000594 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000595
596 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000597 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000598
599 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800600 For each missing item it yields a pair (item, push_state), where:
601 * item - Item object that is missing (one of |items|).
602 * push_state - opaque object that contains storage specific information
603 describing how to upload the item (for example in case of cloud
604 storage, it is signed upload URLs). It can later be passed to
605 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000606 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000607 channel = threading_utils.TaskChannel()
608 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800609
610 # Ensure all digests are calculated.
611 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700612 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800613
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400614 def contains(batch):
615 if self._aborted:
616 raise Aborted()
617 return self._storage_api.contains(batch)
618
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800620 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400621 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400622 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000623 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000625 # Yield results as they come in.
626 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 for missing_item, push_state in channel.pull().iteritems():
628 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000629
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000630
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631def batch_items_for_check(items):
632 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000633
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634 Each batch corresponds to a single 'exists?' query to the server via a call
635 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000636
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800637 Arguments:
638 items: a list of Item objects.
639
640 Yields:
641 Batches of items to query for existence in a single operation,
642 each batch is a list of Item objects.
643 """
644 batch_count = 0
645 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
646 next_queries = []
647 for item in sorted(items, key=lambda x: x.size, reverse=True):
648 next_queries.append(item)
649 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000650 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800651 next_queries = []
652 batch_count += 1
653 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
654 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
655 if next_queries:
656 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000657
658
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000659class FetchQueue(object):
660 """Fetches items from Storage and places them into LocalCache.
661
662 It manages multiple concurrent fetch operations. Acts as a bridge between
663 Storage and LocalCache so that Storage and LocalCache don't depend on each
664 other at all.
665 """
666
667 def __init__(self, storage, cache):
668 self.storage = storage
669 self.cache = cache
670 self._channel = threading_utils.TaskChannel()
671 self._pending = set()
672 self._accessed = set()
673 self._fetched = cache.cached_set()
674
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400675 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700676 self,
677 digest,
678 size=UNKNOWN_FILE_SIZE,
679 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000680 """Starts asynchronous fetch of item |digest|."""
681 # Fetching it now?
682 if digest in self._pending:
683 return
684
685 # Mark this file as in use, verify_all_cached will later ensure it is still
686 # in cache.
687 self._accessed.add(digest)
688
689 # Already fetched? Notify cache to update item's LRU position.
690 if digest in self._fetched:
691 # 'touch' returns True if item is in cache and not corrupted.
692 if self.cache.touch(digest, size):
693 return
694 # Item is corrupted, remove it from cache and fetch it again.
695 self._fetched.remove(digest)
696 self.cache.evict(digest)
697
698 # TODO(maruel): It should look at the free disk space, the current cache
699 # size and the size of the new item on every new item:
700 # - Trim the cache as more entries are listed when free disk space is low,
701 # otherwise if the amount of data downloaded during the run > free disk
702 # space, it'll crash.
703 # - Make sure there's enough free disk space to fit all dependencies of
704 # this run! If not, abort early.
705
706 # Start fetching.
707 self._pending.add(digest)
708 self.storage.async_fetch(
709 self._channel, priority, digest, size,
710 functools.partial(self.cache.write, digest))
711
712 def wait(self, digests):
713 """Starts a loop that waits for at least one of |digests| to be retrieved.
714
715 Returns the first digest retrieved.
716 """
717 # Flush any already fetched items.
718 for digest in digests:
719 if digest in self._fetched:
720 return digest
721
722 # Ensure all requested items are being fetched now.
723 assert all(digest in self._pending for digest in digests), (
724 digests, self._pending)
725
726 # Wait for some requested item to finish fetching.
727 while self._pending:
728 digest = self._channel.pull()
729 self._pending.remove(digest)
730 self._fetched.add(digest)
731 if digest in digests:
732 return digest
733
734 # Should never reach this point due to assert above.
735 raise RuntimeError('Impossible state')
736
737 def inject_local_file(self, path, algo):
738 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700739 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000740 data = f.read()
741 digest = algo(data).hexdigest()
742 self.cache.write(digest, [data])
743 self._fetched.add(digest)
744 return digest
745
746 @property
747 def pending_count(self):
748 """Returns number of items to be fetched."""
749 return len(self._pending)
750
751 def verify_all_cached(self):
752 """True if all accessed items are in cache."""
753 return self._accessed.issubset(self.cache.cached_set())
754
755
756class FetchStreamVerifier(object):
757 """Verifies that fetched file is valid before passing it to the LocalCache."""
758
759 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400760 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000761 self.stream = stream
762 self.expected_size = expected_size
763 self.current_size = 0
764
765 def run(self):
766 """Generator that yields same items as |stream|.
767
768 Verifies |stream| is complete before yielding a last chunk to consumer.
769
770 Also wraps IOError produced by consumer into MappingError exceptions since
771 otherwise Storage will retry fetch on unrelated local cache errors.
772 """
773 # Read one chunk ahead, keep it in |stored|.
774 # That way a complete stream can be verified before pushing last chunk
775 # to consumer.
776 stored = None
777 for chunk in self.stream:
778 assert chunk is not None
779 if stored is not None:
780 self._inspect_chunk(stored, is_last=False)
781 try:
782 yield stored
783 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400784 raise isolated_format.MappingError(
785 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000786 stored = chunk
787 if stored is not None:
788 self._inspect_chunk(stored, is_last=True)
789 try:
790 yield stored
791 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400792 raise isolated_format.MappingError(
793 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000794
795 def _inspect_chunk(self, chunk, is_last):
796 """Called for each fetched chunk before passing it to consumer."""
797 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400798 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700799 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000800 (self.expected_size != self.current_size)):
801 raise IOError('Incorrect file size: expected %d, got %d' % (
802 self.expected_size, self.current_size))
803
804
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000805class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800806 """Interface for classes that implement low-level storage operations.
807
808 StorageApi is oblivious of compression and hashing scheme used. This details
809 are handled in higher level Storage class.
810
811 Clients should generally not use StorageApi directly. Storage class is
812 preferred since it implements compression and upload optimizations.
813 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000814
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700815 @property
816 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500817 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700818 raise NotImplementedError()
819
820 @property
821 def namespace(self):
822 """Isolate namespace used by this storage.
823
824 Indirectly defines hashing scheme and compression method used.
825 """
826 raise NotImplementedError()
827
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800828 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000829 """Fetches an object and yields its content.
830
831 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000832 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800833 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000834
835 Yields:
836 Chunks of downloaded item (as str objects).
837 """
838 raise NotImplementedError()
839
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800840 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000841 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000842
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800843 |item| MUST go through 'contains' call to get |push_state| before it can
844 be pushed to the storage.
845
846 To be clear, here is one possible usage:
847 all_items = [... all items to push as Item subclasses ...]
848 for missing_item, push_state in storage_api.contains(all_items).items():
849 storage_api.push(missing_item, push_state)
850
851 When pushing to a namespace with compression, data that should be pushed
852 and data provided by the item is not the same. In that case |content| is
853 not None and it yields chunks of compressed data (using item.content() as
854 a source of original uncompressed data). This is implemented by Storage
855 class.
856
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000857 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000858 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800859 push_state: push state object as returned by 'contains' call.
860 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000861
862 Returns:
863 None.
864 """
865 raise NotImplementedError()
866
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000867 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800868 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000869
870 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800871 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000872
873 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800874 A dict missing Item -> opaque push state object to be passed to 'push'.
875 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000876 """
877 raise NotImplementedError()
878
879
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800880class _IsolateServerPushState(object):
881 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500882
883 Note this needs to be a global class to support pickling.
884 """
885
Cory Massarocc19c8c2015-03-10 13:35:11 -0700886 def __init__(self, preupload_status, size):
887 self.preupload_status = preupload_status
888 gs_upload_url = preupload_status.get('gs_upload_url') or None
889 if gs_upload_url:
890 self.upload_url = gs_upload_url
891 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
892 else:
893 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
894 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500895 self.uploaded = False
896 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500897 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500898
899
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000900class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000901 """StorageApi implementation that downloads and uploads to Isolate Server.
902
903 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800904 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000905 """
906
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000907 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000908 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500909 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700910 self._base_url = base_url.rstrip('/')
911 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700912 self._namespace_dict = {
913 'compression': 'flate' if namespace.endswith(
914 ('-gzip', '-flate')) else '',
915 'digest_hash': 'sha-1',
916 'namespace': namespace,
917 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000918 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000919 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500920 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000921
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000922 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000923 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700924 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000925
926 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700927 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000928 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000929 # TODO(maruel): Make this request much earlier asynchronously while the
930 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800931
932 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
933 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700934
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000935 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000936 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700937 self._server_caps = net.url_read_json(
938 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
939 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000940 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000941
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700942 @property
943 def location(self):
944 return self._base_url
945
946 @property
947 def namespace(self):
948 return self._namespace
949
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800950 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700951 assert offset >= 0
952 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
953 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800954 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700955 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000956
Cory Massarocc19c8c2015-03-10 13:35:11 -0700957 if not response:
maruele154f9c2015-09-14 11:03:15 -0700958 raise IOError(
959 'Attempted to fetch from %s; no data exist: %s / %s.' % (
960 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800961
Cory Massarocc19c8c2015-03-10 13:35:11 -0700962 # for DB uploads
963 content = response.get('content')
964 if content is not None:
maruel863ac262016-03-17 11:00:37 -0700965 yield base64.b64decode(content)
966 return
Cory Massarocc19c8c2015-03-10 13:35:11 -0700967
968 # for GS entities
969 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -0700970 if not connection:
971 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -0700972
973 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800974 if offset:
975 content_range = connection.get_header('Content-Range')
976 if not content_range:
977 raise IOError('Missing Content-Range header')
978
979 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
980 # According to a spec, <size> can be '*' meaning "Total size of the file
981 # is not known in advance".
982 try:
983 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
984 if not match:
985 raise ValueError()
986 content_offset = int(match.group(1))
987 last_byte_index = int(match.group(2))
988 size = None if match.group(3) == '*' else int(match.group(3))
989 except ValueError:
990 raise IOError('Invalid Content-Range header: %s' % content_range)
991
992 # Ensure returned offset equals requested one.
993 if offset != content_offset:
994 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
995 offset, content_offset, content_range))
996
997 # Ensure entire tail of the file is returned.
998 if size is not None and last_byte_index + 1 != size:
999 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1000
maruel863ac262016-03-17 11:00:37 -07001001 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1002 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001003
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001004 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001005 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001006 assert item.digest is not None
1007 assert item.size is not None
1008 assert isinstance(push_state, _IsolateServerPushState)
1009 assert not push_state.finalized
1010
1011 # Default to item.content().
1012 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001013 logging.info('Push state size: %d', push_state.size)
1014 if isinstance(content, (basestring, list)):
1015 # Memory is already used, too late.
1016 with self._lock:
1017 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001018 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001019 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1020 # If |content| is indeed a generator, it can not be re-winded back to the
1021 # beginning of the stream. A retry will find it exhausted. A possible
1022 # solution is to wrap |content| generator with some sort of caching
1023 # restartable generator. It should be done alongside streaming support
1024 # implementation.
1025 #
1026 # In theory, we should keep the generator, so that it is not serialized in
1027 # memory. Sadly net.HttpService.request() requires the body to be
1028 # serialized.
1029 assert isinstance(content, types.GeneratorType), repr(content)
1030 slept = False
1031 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001032 # One byte less than 512mb. This is to cope with incompressible content.
1033 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001034 while True:
1035 with self._lock:
1036 # This is due to 32 bits python when uploading very large files. The
1037 # problem is that it's comparing uncompressed sizes, while we care
1038 # about compressed sizes since it's what is serialized in memory.
1039 # The first check assumes large files are compressible and that by
1040 # throttling one upload at once, we can survive. Otherwise, kaboom.
1041 memory_use = self._memory_use
1042 if ((push_state.size >= max_size and not memory_use) or
1043 (memory_use + push_state.size <= max_size)):
1044 self._memory_use += push_state.size
1045 memory_use = self._memory_use
1046 break
1047 time.sleep(0.1)
1048 slept = True
1049 if slept:
1050 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001051
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001052 try:
1053 # This push operation may be a retry after failed finalization call below,
1054 # no need to reupload contents in that case.
1055 if not push_state.uploaded:
1056 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001057 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001058 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001059 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001060 item.digest, push_state.upload_url))
1061 push_state.uploaded = True
1062 else:
1063 logging.info(
1064 'A file %s already uploaded, retrying finalization only',
1065 item.digest)
1066
1067 # Optionally notify the server that it's done.
1068 if push_state.finalize_url:
1069 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1070 # send it to isolated server. That way isolate server can verify that
1071 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1072 # stored files).
1073 # TODO(maruel): Fix the server to accept properly data={} so
1074 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001075 response = net.url_read_json(
1076 url='%s/%s' % (self._base_url, push_state.finalize_url),
1077 data={
1078 'upload_ticket': push_state.preupload_status['upload_ticket'],
1079 })
1080 if not response or not response['ok']:
1081 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001082 push_state.finalized = True
1083 finally:
1084 with self._lock:
1085 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001086
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001087 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001088 # Ensure all items were initialized with 'prepare' call. Storage does that.
1089 assert all(i.digest is not None and i.size is not None for i in items)
1090
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001091 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001092 body = {
1093 'items': [
1094 {
1095 'digest': item.digest,
1096 'is_isolated': bool(item.high_priority),
1097 'size': item.size,
1098 } for item in items
1099 ],
1100 'namespace': self._namespace_dict,
1101 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001102
Cory Massarocc19c8c2015-03-10 13:35:11 -07001103 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001104
1105 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001106 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001107 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001108 response = net.url_read_json(url=query_url, data=body)
1109 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001110 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001111 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001112 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001113 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001114 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001115
1116 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001117 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001118 for preupload_status in response.get('items', []):
1119 assert 'upload_ticket' in preupload_status, (
1120 preupload_status, '/preupload did not generate an upload ticket')
1121 index = int(preupload_status['index'])
1122 missing_items[items[index]] = _IsolateServerPushState(
1123 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001124 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001125 len(items), len(items) - len(missing_items))
1126 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001127
Cory Massarocc19c8c2015-03-10 13:35:11 -07001128 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001129 """Fetches isolated data from the URL.
1130
1131 Used only for fetching files, not for API calls. Can be overridden in
1132 subclasses.
1133
1134 Args:
1135 url: URL to fetch the data from, can possibly return http redirect.
1136 offset: byte offset inside the file to start fetching from.
1137
1138 Returns:
1139 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1140 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001141 assert isinstance(offset, int)
1142 data = {
1143 'digest': digest.encode('utf-8'),
1144 'namespace': self._namespace_dict,
1145 'offset': offset,
1146 }
maruel0c25f4f2015-12-15 05:41:17 -08001147 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1148 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001149 return net.url_read_json(
1150 url=url,
1151 data=data,
1152 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001153
Cory Massarocc19c8c2015-03-10 13:35:11 -07001154 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001155 """Uploads isolated file to the URL.
1156
1157 Used only for storing files, not for API calls. Can be overridden in
1158 subclasses.
1159
1160 Args:
1161 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001162 push_state: an _IsolateServicePushState instance
1163 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001164 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001165 """
1166 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1167 # upload support is implemented.
1168 if isinstance(content, list) and len(content) == 1:
1169 content = content[0]
1170 else:
1171 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001172
1173 # DB upload
1174 if not push_state.finalize_url:
1175 url = '%s/%s' % (self._base_url, push_state.upload_url)
1176 content = base64.b64encode(content)
1177 data = {
1178 'upload_ticket': push_state.preupload_status['upload_ticket'],
1179 'content': content,
1180 }
1181 response = net.url_read_json(url=url, data=data)
1182 return response is not None and response['ok']
1183
1184 # upload to GS
1185 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001186 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001187 content_type='application/octet-stream',
1188 data=content,
1189 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001190 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001191 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001192 return response is not None
1193
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001194
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001195class LocalCache(object):
1196 """Local cache that stores objects fetched via Storage.
1197
1198 It can be accessed concurrently from multiple threads, so it should protect
1199 its internal state with some lock.
1200 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001201 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001202
maruel064c0a32016-04-05 11:47:15 -07001203 def __init__(self):
1204 self._lock = threading_utils.LockWithAssert()
1205 # Profiling values.
1206 self._added = []
1207 self._initial_number_items = 0
1208 self._initial_size = 0
1209 self._evicted = []
1210 self._linked = []
1211
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001212 def __enter__(self):
1213 """Context manager interface."""
1214 return self
1215
1216 def __exit__(self, _exc_type, _exec_value, _traceback):
1217 """Context manager interface."""
1218 return False
1219
maruel064c0a32016-04-05 11:47:15 -07001220 @property
1221 def added(self):
1222 return self._added[:]
1223
1224 @property
1225 def evicted(self):
1226 return self._evicted[:]
1227
1228 @property
1229 def initial_number_items(self):
1230 return self._initial_number_items
1231
1232 @property
1233 def initial_size(self):
1234 return self._initial_size
1235
1236 @property
1237 def linked(self):
1238 return self._linked[:]
1239
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001240 def cached_set(self):
1241 """Returns a set of all cached digests (always a new object)."""
1242 raise NotImplementedError()
1243
maruel36a963d2016-04-08 17:15:49 -07001244 def cleanup(self):
1245 """Deletes any corrupted item from the cache and trims it if necessary."""
1246 raise NotImplementedError()
1247
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001248 def touch(self, digest, size):
1249 """Ensures item is not corrupted and updates its LRU position.
1250
1251 Arguments:
1252 digest: hash digest of item to check.
1253 size: expected size of this item.
1254
1255 Returns:
1256 True if item is in cache and not corrupted.
1257 """
1258 raise NotImplementedError()
1259
1260 def evict(self, digest):
1261 """Removes item from cache if it's there."""
1262 raise NotImplementedError()
1263
1264 def read(self, digest):
1265 """Returns contents of the cached item as a single str."""
1266 raise NotImplementedError()
1267
1268 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001269 """Reads data from |content| generator and stores it in cache.
1270
1271 Returns digest to simplify chaining.
1272 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001273 raise NotImplementedError()
1274
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001275 def hardlink(self, digest, dest, file_mode):
1276 """Ensures file at |dest| has same content as cached |digest|.
1277
1278 If file_mode is provided, it is used to set the executable bit if
1279 applicable.
1280 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001281 raise NotImplementedError()
1282
1283
1284class MemoryCache(LocalCache):
1285 """LocalCache implementation that stores everything in memory."""
1286
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001287 def __init__(self, file_mode_mask=0500):
1288 """Args:
1289 file_mode_mask: bit mask to AND file mode with. Default value will make
1290 all mapped files to be read only.
1291 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001292 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001293 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001294 self._contents = {}
1295
1296 def cached_set(self):
1297 with self._lock:
1298 return set(self._contents)
1299
maruel36a963d2016-04-08 17:15:49 -07001300 def cleanup(self):
1301 pass
1302
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001303 def touch(self, digest, size):
1304 with self._lock:
1305 return digest in self._contents
1306
1307 def evict(self, digest):
1308 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001309 v = self._contents.pop(digest, None)
1310 if v is not None:
1311 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001312
1313 def read(self, digest):
1314 with self._lock:
1315 return self._contents[digest]
1316
1317 def write(self, digest, content):
1318 # Assemble whole stream before taking the lock.
1319 data = ''.join(content)
1320 with self._lock:
1321 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001322 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001323 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001324
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001325 def hardlink(self, digest, dest, file_mode):
1326 """Since data is kept in memory, there is no filenode to hardlink."""
maruel064c0a32016-04-05 11:47:15 -07001327 data = self.read(digest)
1328 file_write(dest, [data])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001329 if file_mode is not None:
maruel12e30012015-10-09 11:55:35 -07001330 fs.chmod(dest, file_mode & self._file_mode_mask)
maruel064c0a32016-04-05 11:47:15 -07001331 with self._lock:
1332 self._linked.append(len(data))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001333
1334
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001335class CachePolicies(object):
1336 def __init__(self, max_cache_size, min_free_space, max_items):
1337 """
1338 Arguments:
1339 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1340 cache is effectively a leak.
1341 - min_free_space: Trim if disk free space becomes lower than this value. If
1342 0, it unconditionally fill the disk.
1343 - max_items: Maximum number of items to keep in the cache. If 0, do not
1344 enforce a limit.
1345 """
1346 self.max_cache_size = max_cache_size
1347 self.min_free_space = min_free_space
1348 self.max_items = max_items
1349
1350
1351class DiskCache(LocalCache):
1352 """Stateful LRU cache in a flat hash table in a directory.
1353
1354 Saves its state as json file.
1355 """
maruel12e30012015-10-09 11:55:35 -07001356 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001357
1358 def __init__(self, cache_dir, policies, hash_algo):
1359 """
1360 Arguments:
1361 cache_dir: directory where to place the cache.
1362 policies: cache retention policies.
1363 algo: hashing algorithm used.
1364 """
maruel064c0a32016-04-05 11:47:15 -07001365 # All protected methods (starting with '_') except _path should be called
1366 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001367 super(DiskCache, self).__init__()
1368 self.cache_dir = cache_dir
1369 self.policies = policies
1370 self.hash_algo = hash_algo
1371 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001372 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001373 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001374 # Current cached free disk space. It is updated by self._trim().
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001375 self._free_disk = 0
maruel083fa552016-04-08 14:38:01 -07001376 # The items that must not be evicted during this run since they were
1377 # referenced.
1378 self._protected = set()
maruel36a963d2016-04-08 17:15:49 -07001379 # Cleanup operations done by self._load(), if any.
1380 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001381 with tools.Profiler('Setup'):
1382 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001383 # self._load() calls self._trim() which initializes self._free_disk.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001384 self._load()
1385
1386 def __enter__(self):
1387 return self
1388
1389 def __exit__(self, _exc_type, _exec_value, _traceback):
1390 with tools.Profiler('CleanupTrimming'):
1391 with self._lock:
1392 self._trim()
1393
1394 logging.info(
1395 '%5d (%8dkb) added',
1396 len(self._added), sum(self._added) / 1024)
1397 logging.info(
1398 '%5d (%8dkb) current',
1399 len(self._lru),
1400 sum(self._lru.itervalues()) / 1024)
1401 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001402 '%5d (%8dkb) evicted',
1403 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001404 logging.info(
1405 ' %8dkb free',
1406 self._free_disk / 1024)
1407 return False
1408
1409 def cached_set(self):
1410 with self._lock:
1411 return self._lru.keys_set()
1412
maruel36a963d2016-04-08 17:15:49 -07001413 def cleanup(self):
1414 # At that point, the cache was already loaded, trimmed to respect cache
1415 # policies and invalid files were deleted.
1416 if self._evicted:
1417 logging.info(
1418 'Evicted items with the following sizes: %s', sorted(self._evicted))
1419
1420 # What remains to be done is to hash every single item to
1421 # detect corruption, then save to ensure state.json is up to date.
1422 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1423 # TODO(maruel): Let's revisit once directory metadata is stored in
1424 # state.json so only the files that had been mapped since the last cleanup()
1425 # call are manually verified.
1426 #
1427 #with self._lock:
1428 # for digest in self._lru:
1429 # if not isolated_format.is_valid_hash(
1430 # self._path(digest), self.hash_algo):
1431 # self.evict(digest)
1432 # logging.info('Deleted corrupted item: %s', digest)
1433
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001434 def touch(self, digest, size):
1435 """Verifies an actual file is valid.
1436
1437 Note that is doesn't compute the hash so it could still be corrupted if the
1438 file size didn't change.
1439
1440 TODO(maruel): More stringent verification while keeping the check fast.
1441 """
1442 # Do the check outside the lock.
1443 if not is_valid_file(self._path(digest), size):
1444 return False
1445
1446 # Update it's LRU position.
1447 with self._lock:
1448 if digest not in self._lru:
1449 return False
1450 self._lru.touch(digest)
maruel083fa552016-04-08 14:38:01 -07001451 self._protected.add(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001452 return True
1453
1454 def evict(self, digest):
1455 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001456 # Do not check for 'digest in self._protected' since it could be because
1457 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001458 self._lru.pop(digest)
1459 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1460
1461 def read(self, digest):
maruel12e30012015-10-09 11:55:35 -07001462 with fs.open(self._path(digest), 'rb') as f:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001463 return f.read()
1464
1465 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001466 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001467 with self._lock:
1468 self._protected.add(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001469 path = self._path(digest)
1470 # A stale broken file may remain. It is possible for the file to have write
1471 # access bit removed which would cause the file_write() call to fail to open
1472 # in write mode. Take no chance here.
1473 file_path.try_remove(path)
1474 try:
1475 size = file_write(path, content)
1476 except:
1477 # There are two possible places were an exception can occur:
1478 # 1) Inside |content| generator in case of network or unzipping errors.
1479 # 2) Inside file_write itself in case of disk IO errors.
1480 # In any case delete an incomplete file and propagate the exception to
1481 # caller, it will be logged there.
1482 file_path.try_remove(path)
1483 raise
1484 # Make the file read-only in the cache. This has a few side-effects since
1485 # the file node is modified, so every directory entries to this file becomes
1486 # read-only. It's fine here because it is a new file.
1487 file_path.set_read_only(path, True)
1488 with self._lock:
1489 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001490 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001491
1492 def hardlink(self, digest, dest, file_mode):
1493 """Hardlinks the file to |dest|.
1494
1495 Note that the file permission bits are on the file node, not the directory
1496 entry, so changing the access bit on any of the directory entries for the
1497 file node will affect them all.
1498 """
1499 path = self._path(digest)
maruel1f7e8162015-09-16 10:35:43 -07001500 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1501 # Report to the server that it failed with more details. We'll want to
1502 # squash them all.
1503 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1504
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001505 if file_mode is not None:
1506 # Ignores all other bits.
maruel12e30012015-10-09 11:55:35 -07001507 fs.chmod(dest, file_mode & 0500)
maruel064c0a32016-04-05 11:47:15 -07001508 with self._lock:
1509 self._linked.append(self._lru[digest])
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001510
1511 def _load(self):
1512 """Loads state of the cache from json file."""
1513 self._lock.assert_locked()
1514
1515 if not os.path.isdir(self.cache_dir):
maruel12e30012015-10-09 11:55:35 -07001516 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001517 else:
1518 # Make sure the cache is read-only.
1519 # TODO(maruel): Calculate the cost and optimize the performance
1520 # accordingly.
1521 file_path.make_tree_read_only(self.cache_dir)
1522
1523 # Load state of the cache.
maruel12e30012015-10-09 11:55:35 -07001524 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001525 try:
1526 self._lru = lru.LRUDict.load(self.state_file)
1527 except ValueError as err:
1528 logging.error('Failed to load cache state: %s' % (err,))
1529 # Don't want to keep broken state file.
1530 file_path.try_remove(self.state_file)
1531
1532 # Ensure that all files listed in the state still exist and add new ones.
1533 previous = self._lru.keys_set()
1534 unknown = []
maruel12e30012015-10-09 11:55:35 -07001535 for filename in fs.listdir(self.cache_dir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001536 if filename == self.STATE_FILE:
1537 continue
1538 if filename in previous:
maruel064c0a32016-04-05 11:47:15 -07001539 self._initial_size += self._lru[filename]
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001540 previous.remove(filename)
maruel064c0a32016-04-05 11:47:15 -07001541 self._initial_number_items += 1
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001542 continue
1543 # An untracked file.
1544 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1545 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001546 p = self._path(filename)
maruel12e30012015-10-09 11:55:35 -07001547 if fs.isdir(p):
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001548 try:
1549 file_path.rmtree(p)
1550 except OSError:
1551 pass
1552 else:
1553 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001554 continue
1555 # File that's not referenced in 'state.json'.
1556 # TODO(vadimsh): Verify its SHA1 matches file name.
1557 logging.warning('Adding unknown file %s to cache', filename)
1558 unknown.append(filename)
1559
1560 if unknown:
1561 # Add as oldest files. They will be deleted eventually if not accessed.
maruel064c0a32016-04-05 11:47:15 -07001562 pairs = []
1563 for digest in unknown:
1564 size = fs.stat(self._path(digest)).st_size
1565 self._initial_size += size
1566 self._initial_number_items += 1
1567 pairs.append((digest, size))
1568 self._lru.batch_insert_oldest(pairs)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001569 logging.warning('Added back %d unknown files', len(unknown))
1570
1571 if previous:
1572 # Filter out entries that were not found.
1573 logging.warning('Removed %d lost files', len(previous))
1574 for filename in previous:
1575 self._lru.pop(filename)
1576 self._trim()
1577
1578 def _save(self):
1579 """Saves the LRU ordering."""
1580 self._lock.assert_locked()
1581 if sys.platform != 'win32':
1582 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001583 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001584 # Necessary otherwise the file can't be created.
1585 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001586 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001587 file_path.set_read_only(self.state_file, False)
1588 self._lru.save(self.state_file)
1589
1590 def _trim(self):
1591 """Trims anything we don't know, make sure enough free space exists."""
1592 self._lock.assert_locked()
1593
1594 # Ensure maximum cache size.
1595 if self.policies.max_cache_size:
1596 total_size = sum(self._lru.itervalues())
1597 while total_size > self.policies.max_cache_size:
1598 total_size -= self._remove_lru_file()
1599
1600 # Ensure maximum number of items in the cache.
1601 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1602 for _ in xrange(len(self._lru) - self.policies.max_items):
1603 self._remove_lru_file()
1604
1605 # Ensure enough free space.
1606 self._free_disk = file_path.get_free_space(self.cache_dir)
1607 trimmed_due_to_space = False
1608 while (
1609 self.policies.min_free_space and
1610 self._lru and
1611 self._free_disk < self.policies.min_free_space):
1612 trimmed_due_to_space = True
1613 self._remove_lru_file()
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001614 if trimmed_due_to_space:
1615 total_usage = sum(self._lru.itervalues())
1616 usage_percent = 0.
1617 if total_usage:
1618 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1619 logging.warning(
1620 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1621 'cache (%.1f%% of its maximum capacity)',
1622 self._free_disk / 1024.,
1623 total_usage / 1024.,
1624 usage_percent)
1625 self._save()
1626
1627 def _path(self, digest):
1628 """Returns the path to one item."""
1629 return os.path.join(self.cache_dir, digest)
1630
1631 def _remove_lru_file(self):
1632 """Removes the last recently used file and returns its size."""
1633 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001634 try:
1635 digest, size = self._lru.get_oldest()
1636 except KeyError:
1637 raise Error('Nothing to remove')
1638 if digest in self._protected:
1639 raise Error('Not enough space to map the whole isolated tree')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001640 digest, size = self._lru.pop_oldest()
1641 self._delete_file(digest, size)
1642 return size
1643
1644 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1645 """Adds an item into LRU cache marking it as a newest one."""
1646 self._lock.assert_locked()
1647 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001648 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001649 self._added.append(size)
1650 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001651 self._free_disk -= size
1652 # Do a quicker version of self._trim(). It only enforces free disk space,
1653 # not cache size limits. It doesn't actually look at real free disk space,
1654 # only uses its cache values. self._trim() will be called later to enforce
1655 # real trimming but doing this quick version here makes it possible to map
1656 # an isolated that is larger than the current amount of free disk space when
1657 # the cache size is already large.
1658 while (
1659 self.policies.min_free_space and
1660 self._lru and
1661 self._free_disk < self.policies.min_free_space):
1662 self._remove_lru_file()
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001663
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001664 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1665 """Deletes cache file from the file system."""
1666 self._lock.assert_locked()
1667 try:
1668 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001669 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001670 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001671 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001672 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001673 except OSError as e:
1674 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1675
1676
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001677class IsolatedBundle(object):
1678 """Fetched and parsed .isolated file with all dependencies."""
1679
Vadim Shtayura3148e072014-09-02 18:51:52 -07001680 def __init__(self):
1681 self.command = []
1682 self.files = {}
1683 self.read_only = None
1684 self.relative_cwd = None
1685 # The main .isolated file, a IsolatedFile instance.
1686 self.root = None
1687
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001688 def fetch(self, fetch_queue, root_isolated_hash, algo):
1689 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001690
1691 It enables support for "included" .isolated files. They are processed in
1692 strict order but fetched asynchronously from the cache. This is important so
1693 that a file in an included .isolated file that is overridden by an embedding
1694 .isolated file is not fetched needlessly. The includes are fetched in one
1695 pass and the files are fetched as soon as all the ones on the left-side
1696 of the tree were fetched.
1697
1698 The prioritization is very important here for nested .isolated files.
1699 'includes' have the highest priority and the algorithm is optimized for both
1700 deep and wide trees. A deep one is a long link of .isolated files referenced
1701 one at a time by one item in 'includes'. A wide one has a large number of
1702 'includes' in a single .isolated file. 'left' is defined as an included
1703 .isolated file earlier in the 'includes' list. So the order of the elements
1704 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001705
1706 As a side effect this method starts asynchronous fetch of all data files
1707 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1708 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001709 """
1710 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1711
1712 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1713 pending = {}
1714 # Set of hashes of already retrieved items to refuse recursive includes.
1715 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001716 # Set of IsolatedFile's whose data files have already being fetched.
1717 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001718
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001719 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001720 h = isolated_file.obj_hash
1721 if h in seen:
1722 raise isolated_format.IsolatedError(
1723 'IsolatedFile %s is retrieved recursively' % h)
1724 assert h not in pending
1725 seen.add(h)
1726 pending[h] = isolated_file
1727 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1728
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001729 # Start fetching root *.isolated file (single file, not the whole bundle).
1730 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001731
1732 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001733 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001734 item_hash = fetch_queue.wait(pending)
1735 item = pending.pop(item_hash)
1736 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001737
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001738 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001739 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001740 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001741
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001742 # Always fetch *.isolated files in traversal order, waiting if necessary
1743 # until next to-be-processed node loads. "Waiting" is done by yielding
1744 # back to the outer loop, that waits until some *.isolated is loaded.
1745 for node in isolated_format.walk_includes(self.root):
1746 if node not in processed:
1747 # Not visited, and not yet loaded -> wait for it to load.
1748 if not node.is_loaded:
1749 break
1750 # Not visited and loaded -> process it and continue the traversal.
1751 self._start_fetching_files(node, fetch_queue)
1752 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001753
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001754 # All *.isolated files should be processed by now and only them.
1755 all_isolateds = set(isolated_format.walk_includes(self.root))
1756 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001757
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001758 # Extract 'command' and other bundle properties.
1759 for node in isolated_format.walk_includes(self.root):
1760 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001761 self.relative_cwd = self.relative_cwd or ''
1762
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001763 def _start_fetching_files(self, isolated, fetch_queue):
1764 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001765
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001766 Modifies self.files.
1767 """
1768 logging.debug('fetch_files(%s)', isolated.obj_hash)
1769 for filepath, properties in isolated.data.get('files', {}).iteritems():
1770 # Root isolated has priority on the files being mapped. In particular,
1771 # overridden files must not be fetched.
1772 if filepath not in self.files:
1773 self.files[filepath] = properties
1774 if 'h' in properties:
1775 # Preemptively request files.
1776 logging.debug('fetching %s', filepath)
1777 fetch_queue.add(
1778 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1779
1780 def _update_self(self, node):
1781 """Extracts bundle global parameters from loaded *.isolated file.
1782
1783 Will be called with each loaded *.isolated file in order of traversal of
1784 isolated include graph (see isolated_format.walk_includes).
1785 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001786 # Grabs properties.
1787 if not self.command and node.data.get('command'):
1788 # Ensure paths are correctly separated on windows.
1789 self.command = node.data['command']
1790 if self.command:
1791 self.command[0] = self.command[0].replace('/', os.path.sep)
1792 self.command = tools.fix_python_path(self.command)
1793 if self.read_only is None and node.data.get('read_only') is not None:
1794 self.read_only = node.data['read_only']
1795 if (self.relative_cwd is None and
1796 node.data.get('relative_cwd') is not None):
1797 self.relative_cwd = node.data['relative_cwd']
1798
1799
Vadim Shtayura8623c272014-12-01 11:45:27 -08001800def set_storage_api_class(cls):
1801 """Replaces StorageApi implementation used by default."""
1802 global _storage_api_cls
1803 assert _storage_api_cls is None
1804 assert issubclass(cls, StorageApi)
1805 _storage_api_cls = cls
1806
1807
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001808def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001809 """Returns an object that implements low-level StorageApi interface.
1810
1811 It is used by Storage to work with single isolate |namespace|. It should
1812 rarely be used directly by clients, see 'get_storage' for
1813 a better alternative.
1814
1815 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001816 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001817 namespace: isolate namespace to operate in, also defines hashing and
1818 compression scheme used, i.e. namespace names that end with '-gzip'
1819 store compressed data.
1820
1821 Returns:
1822 Instance of StorageApi subclass.
1823 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001824 cls = _storage_api_cls or IsolateServer
1825 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001826
1827
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001828def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001829 """Returns Storage class that can upload and download from |namespace|.
1830
1831 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001832 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001833 namespace: isolate namespace to operate in, also defines hashing and
1834 compression scheme used, i.e. namespace names that end with '-gzip'
1835 store compressed data.
1836
1837 Returns:
1838 Instance of Storage.
1839 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001840 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001841
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001842
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001843def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001844 """Uploads the given tree to the given url.
1845
1846 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001847 base_url: The url of the isolate server to upload to.
1848 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001849 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001850 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001851 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001852 # Filter out symlinks, since they are not represented by items on isolate
1853 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001854 items = []
1855 seen = set()
1856 skipped = 0
1857 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001858 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001859 if 'l' not in metadata and filepath not in seen:
1860 seen.add(filepath)
1861 item = FileItem(
1862 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001863 digest=metadata['h'],
1864 size=metadata['s'],
1865 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001866 items.append(item)
1867 else:
1868 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001869
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001870 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001871 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001872 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001873
1874
maruelb8d88d12016-04-08 12:54:01 -07001875def fetch_isolated(isolated_hash, storage, cache, outdir):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001876 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001877
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001878 Arguments:
1879 isolated_hash: hash of the root *.isolated file.
1880 storage: Storage class that communicates with isolate storage.
1881 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001882 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001883
1884 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001885 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001886 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001887 logging.debug(
maruelb8d88d12016-04-08 12:54:01 -07001888 'fetch_isolated(%s, %s, %s, %s)', isolated_hash, storage, cache, outdir)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001889 # Hash algorithm to use, defined by namespace |storage| is using.
1890 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001891 with cache:
1892 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001893 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001894
1895 with tools.Profiler('GetIsolateds'):
1896 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001897 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001898 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001899 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001900 try:
maruel1ceb3872015-10-14 06:10:44 -07001901 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001902 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001903 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001904 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1905 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001906
1907 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001908 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001909
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001910 with tools.Profiler('GetRest'):
1911 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001912 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001913 create_directories(outdir, bundle.files)
1914 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001915
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001916 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001917 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001918 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001919
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001920 # Multimap: digest -> list of pairs (path, props).
1921 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001922 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001923 if 'h' in props:
1924 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001925
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001926 # Now block on the remaining files to be downloaded and mapped.
1927 logging.info('Retrieving remaining files (%d of them)...',
1928 fetch_queue.pending_count)
1929 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001930 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001931 while remaining:
1932 detector.ping()
1933
1934 # Wait for any item to finish fetching to cache.
1935 digest = fetch_queue.wait(remaining)
1936
1937 # Link corresponding files to a fetched item in cache.
1938 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001939 cache.hardlink(
1940 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001941
1942 # Report progress.
1943 duration = time.time() - last_update
1944 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1945 msg = '%d files remaining...' % len(remaining)
1946 print msg
1947 logging.info(msg)
1948 last_update = time.time()
1949
1950 # Cache could evict some items we just tried to fetch, it's a fatal error.
1951 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001952 raise isolated_format.MappingError(
1953 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001954 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001955
1956
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001957def directory_to_metadata(root, algo, blacklist):
1958 """Returns the FileItem list and .isolated metadata for a directory."""
1959 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001960 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001961 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001962 metadata = {
1963 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001964 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001965 for relpath in paths
1966 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001967 for v in metadata.itervalues():
1968 v.pop('t')
1969 items = [
1970 FileItem(
1971 path=os.path.join(root, relpath),
1972 digest=meta['h'],
1973 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001974 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001975 for relpath, meta in metadata.iteritems() if 'h' in meta
1976 ]
1977 return items, metadata
1978
1979
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001980def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001981 """Stores every entries and returns the relevant data.
1982
1983 Arguments:
1984 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001985 files: list of file paths to upload. If a directory is specified, a
1986 .isolated file is created and its hash is returned.
1987 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001988
1989 Returns:
1990 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1991 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001992 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001993 assert all(isinstance(i, unicode) for i in files), files
1994 if len(files) != len(set(map(os.path.abspath, files))):
1995 raise Error('Duplicate entries found.')
1996
maruel064c0a32016-04-05 11:47:15 -07001997 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001998 results = []
1999 # The temporary directory is only created as needed.
2000 tempdir = None
2001 try:
2002 # TODO(maruel): Yield the files to a worker thread.
2003 items_to_upload = []
2004 for f in files:
2005 try:
2006 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002007 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002008 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002009 items, metadata = directory_to_metadata(
2010 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002011
2012 # Create the .isolated file.
2013 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002014 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2015 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002016 os.close(handle)
2017 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002018 'algo':
2019 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002020 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002021 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002022 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002023 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002024 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002025 items_to_upload.extend(items)
2026 items_to_upload.append(
2027 FileItem(
2028 path=isolated,
2029 digest=h,
maruel12e30012015-10-09 11:55:35 -07002030 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002031 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002032 results.append((h, f))
2033
maruel12e30012015-10-09 11:55:35 -07002034 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002035 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002036 items_to_upload.append(
2037 FileItem(
2038 path=filepath,
2039 digest=h,
maruel12e30012015-10-09 11:55:35 -07002040 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002041 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002042 results.append((h, f))
2043 else:
2044 raise Error('%s is neither a file or directory.' % f)
2045 except OSError:
2046 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002047 uploaded = storage.upload_items(items_to_upload)
2048 cold = [i for i in items_to_upload if i in uploaded]
2049 hot = [i for i in items_to_upload if i not in uploaded]
2050 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002051 finally:
maruel12e30012015-10-09 11:55:35 -07002052 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002053 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002054
2055
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002056def archive(out, namespace, files, blacklist):
2057 if files == ['-']:
2058 files = sys.stdin.readlines()
2059
2060 if not files:
2061 raise Error('Nothing to upload')
2062
2063 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002064 blacklist = tools.gen_blacklist(blacklist)
2065 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002066 # Ignore stats.
2067 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002068 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2069
2070
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002071@subcommand.usage('<file1..fileN> or - to read from stdin')
2072def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002073 """Archives data to the server.
2074
2075 If a directory is specified, a .isolated file is created the whole directory
2076 is uploaded. Then this .isolated file can be included in another one to run
2077 commands.
2078
2079 The commands output each file that was processed with its content hash. For
2080 directories, the .isolated generated for the directory is listed as the
2081 directory entry itself.
2082 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002083 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002084 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002085 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002086 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002087 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002088 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002089 except Error as e:
2090 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002091 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002092
2093
2094def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002095 """Download data from the server.
2096
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002097 It can either download individual files or a complete tree from a .isolated
2098 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002099 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002100 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002101 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002102 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002103 help='hash of an isolated file, .isolated file content is discarded, use '
2104 '--file if you need it')
2105 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002106 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2107 help='hash and destination of a file, can be used multiple times')
2108 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002109 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002110 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002111 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002112 options, args = parser.parse_args(args)
2113 if args:
2114 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002115
nodir55be77b2016-05-03 09:39:57 -07002116 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002117 if bool(options.isolated) == bool(options.file):
2118 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002119
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002120 cache = process_cache_options(options)
maruel12e30012015-10-09 11:55:35 -07002121 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002122 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002123 if (fs.isfile(options.target) or
2124 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002125 parser.error(
2126 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002127 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002128 # Fetching individual files.
2129 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002130 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002131 channel = threading_utils.TaskChannel()
2132 pending = {}
2133 for digest, dest in options.file:
2134 pending[digest] = dest
2135 storage.async_fetch(
2136 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002137 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002138 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002139 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002140 functools.partial(file_write, os.path.join(options.target, dest)))
2141 while pending:
2142 fetched = channel.pull()
2143 dest = pending.pop(fetched)
2144 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002145
Vadim Shtayura3172be52013-12-03 12:49:05 -08002146 # Fetching whole isolated tree.
2147 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002148 with cache:
2149 bundle = fetch_isolated(
2150 isolated_hash=options.isolated,
2151 storage=storage,
2152 cache=cache,
maruelb8d88d12016-04-08 12:54:01 -07002153 outdir=options.target)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002154 if bundle.command:
2155 rel = os.path.join(options.target, bundle.relative_cwd)
2156 print('To run this test please run from the directory %s:' %
2157 os.path.join(options.target, rel))
2158 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002159
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002160 return 0
2161
2162
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002163def add_archive_options(parser):
2164 parser.add_option(
2165 '--blacklist',
2166 action='append', default=list(DEFAULT_BLACKLIST),
2167 help='List of regexp to use as blacklist filter when uploading '
2168 'directories')
2169
2170
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002171def add_isolate_server_options(parser):
2172 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002173 parser.add_option(
2174 '-I', '--isolate-server',
2175 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002176 help='URL of the Isolate Server to use. Defaults to the environment '
2177 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2178 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002179 parser.add_option(
2180 '--namespace', default='default-gzip',
2181 help='The namespace to use on the Isolate Server, default: %default')
2182
2183
nodir55be77b2016-05-03 09:39:57 -07002184def process_isolate_server_options(
2185 parser, options, set_exception_handler, required):
2186 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002187
2188 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002189 """
2190 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002191 if required:
2192 parser.error('--isolate-server is required.')
2193 return
2194
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002195 try:
2196 options.isolate_server = net.fix_url(options.isolate_server)
2197 except ValueError as e:
2198 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002199 if set_exception_handler:
2200 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002201 try:
2202 return auth.ensure_logged_in(options.isolate_server)
2203 except ValueError as e:
2204 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002205
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002206
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002207def add_cache_options(parser):
2208 cache_group = optparse.OptionGroup(parser, 'Cache management')
2209 cache_group.add_option(
2210 '--cache', metavar='DIR',
2211 help='Directory to keep a local cache of the files. Accelerates download '
2212 'by reusing already downloaded files. Default=%default')
2213 cache_group.add_option(
2214 '--max-cache-size',
2215 type='int',
2216 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002217 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002218 help='Trim if the cache gets larger than this value, default=%default')
2219 cache_group.add_option(
2220 '--min-free-space',
2221 type='int',
2222 metavar='NNN',
2223 default=2*1024*1024*1024,
2224 help='Trim if disk free space becomes lower than this value, '
2225 'default=%default')
2226 cache_group.add_option(
2227 '--max-items',
2228 type='int',
2229 metavar='NNN',
2230 default=100000,
2231 help='Trim if more than this number of items are in the cache '
2232 'default=%default')
2233 parser.add_option_group(cache_group)
2234
2235
2236def process_cache_options(options):
2237 if options.cache:
2238 policies = CachePolicies(
2239 options.max_cache_size, options.min_free_space, options.max_items)
2240
2241 # |options.cache| path may not exist until DiskCache() instance is created.
2242 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002243 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002244 policies,
2245 isolated_format.get_hash_algo(options.namespace))
2246 else:
2247 return MemoryCache()
2248
2249
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002250class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002251 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002252 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002253 self,
2254 version=__version__,
2255 prog=os.path.basename(sys.modules[__name__].__file__),
2256 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002257 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002258
2259 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002260 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002261 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002262 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002263 return options, args
2264
2265
2266def main(args):
2267 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002268 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002269
2270
2271if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002272 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002273 fix_encoding.fix_encoding()
2274 tools.disable_buffering()
2275 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002276 sys.exit(main(sys.argv[1:]))