blob: f6b19819ea098f97d86ca0b24c1933aafc218b12 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
3# Use of this source code is governed by the Apache v2.0 license that can be
4# found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
maruelb8d88d12016-04-08 12:54:01 -07008__version__ = '0.4.7'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# The file size to be used when we don't know the correct file size,
48# generally used for .isolated files.
49UNKNOWN_FILE_SIZE = None
50
51
52# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000080# Chunk size to use when reading from network stream.
81NET_IO_FILE_CHUNK = 16 * 1024
82
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000083
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000084# Read timeout in seconds for downloads from isolate storage. If there's no
85# response from the server within this timeout whole download will be aborted.
86DOWNLOAD_READ_TIMEOUT = 60
87
88
maruel@chromium.org41601642013-09-18 19:40:46 +000089# The delay (in seconds) to wait between logging statements when retrieving
90# the required files. This is intended to let the user (or buildbot) know that
91# the program is still running.
92DELAY_BETWEEN_UPDATES_IN_SECS = 30
93
94
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050095DEFAULT_BLACKLIST = (
96 # Temporary vim or python files.
97 r'^.+\.(?:pyc|swp)$',
98 # .git or .svn directory.
99 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
100)
101
102
Vadim Shtayura8623c272014-12-01 11:45:27 -0800103# A class to use to communicate with the server by default. Can be changed by
104# 'set_storage_api_class'. Default is IsolateServer.
105_storage_api_cls = None
106
107
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500108class Error(Exception):
109 """Generic runtime error."""
110 pass
111
112
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400113class Aborted(Error):
114 """Operation aborted."""
115 pass
116
117
maruel12e30012015-10-09 11:55:35 -0700118def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800119 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700120 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800121 if offset:
122 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000123 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000124 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 if not data:
126 break
127 yield data
128
129
maruel12e30012015-10-09 11:55:35 -0700130def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000131 """Writes file content as generated by content_generator.
132
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 Creates the intermediary directory as needed.
134
135 Returns the number of bytes written.
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 Meant to be mocked out in unit tests.
138 """
maruel12e30012015-10-09 11:55:35 -0700139 filedir = os.path.dirname(path)
140 if not fs.isdir(filedir):
141 fs.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000142 total = 0
maruel12e30012015-10-09 11:55:35 -0700143 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000145 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148
149
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000150def zip_compress(content_generator, level=7):
151 """Reads chunks from |content_generator| and yields zip compressed chunks."""
152 compressor = zlib.compressobj(level)
153 for chunk in content_generator:
154 compressed = compressor.compress(chunk)
155 if compressed:
156 yield compressed
157 tail = compressor.flush(zlib.Z_FINISH)
158 if tail:
159 yield tail
160
161
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400162def zip_decompress(
163 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000164 """Reads zipped data from |content_generator| and yields decompressed data.
165
166 Decompresses data in small chunks (no larger than |chunk_size|) so that
167 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
168
169 Raises IOError if data is corrupted or incomplete.
170 """
171 decompressor = zlib.decompressobj()
172 compressed_size = 0
173 try:
174 for chunk in content_generator:
175 compressed_size += len(chunk)
176 data = decompressor.decompress(chunk, chunk_size)
177 if data:
178 yield data
179 while decompressor.unconsumed_tail:
180 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
181 if data:
182 yield data
183 tail = decompressor.flush()
184 if tail:
185 yield tail
186 except zlib.error as e:
187 raise IOError(
188 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
189 # Ensure all data was read and decompressed.
190 if decompressor.unused_data or decompressor.unconsumed_tail:
191 raise IOError('Not all data was decompressed')
192
193
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000194def get_zip_compression_level(filename):
195 """Given a filename calculates the ideal zip compression level to use."""
196 file_ext = os.path.splitext(filename)[1].lower()
197 # TODO(csharp): Profile to find what compression level works best.
198 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
199
200
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000201def create_directories(base_directory, files):
202 """Creates the directory structure needed by the given list of files."""
203 logging.debug('create_directories(%s, %d)', base_directory, len(files))
204 # Creates the tree of directories to create.
205 directories = set(os.path.dirname(f) for f in files)
206 for item in list(directories):
207 while item:
208 directories.add(item)
209 item = os.path.dirname(item)
210 for d in sorted(directories):
211 if d:
maruel12e30012015-10-09 11:55:35 -0700212 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000213
214
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500215def create_symlinks(base_directory, files):
216 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000217 for filepath, properties in files:
218 if 'l' not in properties:
219 continue
220 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500221 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000222 logging.warning('Ignoring symlink %s', filepath)
223 continue
224 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500225 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000226 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000227
228
maruel12e30012015-10-09 11:55:35 -0700229def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000230 """Determines if the given files appears valid.
231
232 Currently it just checks the file's size.
233 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700234 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700235 return fs.isfile(path)
236 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000237 if size != actual_size:
238 logging.warning(
239 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700240 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000241 return False
242 return True
243
244
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000245class Item(object):
246 """An item to push to Storage.
247
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800248 Its digest and size may be provided in advance, if known. Otherwise they will
249 be derived from content(). If digest is provided, it MUST correspond to
250 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800252 When used with Storage, Item starts its life in a main thread, travels
253 to 'contains' thread, then to 'push' thread and then finally back to
254 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000255 """
256
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800257 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000258 self.digest = digest
259 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800260 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def content(self):
264 """Iterable with content of this item as byte string (str) chunks."""
265 raise NotImplementedError()
266
267 def prepare(self, hash_algo):
268 """Ensures self.digest and self.size are set.
269
270 Uses content() as a source of data to calculate them. Does nothing if digest
271 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000272
273 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800274 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000275 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800276 if self.digest is None or self.size is None:
277 digest = hash_algo()
278 total = 0
279 for chunk in self.content():
280 digest.update(chunk)
281 total += len(chunk)
282 self.digest = digest.hexdigest()
283 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000284
285
286class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800287 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000288
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800289 Its digest and size may be provided in advance, if known. Otherwise they will
290 be derived from the file content.
291 """
292
293 def __init__(self, path, digest=None, size=None, high_priority=False):
294 super(FileItem, self).__init__(
295 digest,
maruel12e30012015-10-09 11:55:35 -0700296 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800297 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000298 self.path = path
299 self.compression_level = get_zip_compression_level(path)
300
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800301 def content(self):
302 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000303
304
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000305class BufferItem(Item):
306 """A byte buffer to push to Storage."""
307
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800308 def __init__(self, buf, high_priority=False):
309 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000310 self.buffer = buf
311
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800312 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000313 return [self.buffer]
314
315
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000316class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000318
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800319 Implements compression support, parallel 'contains' checks, parallel uploads
320 and more.
321
322 Works only within single namespace (and thus hashing algorithm and compression
323 scheme are fixed).
324
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400325 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
326 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800327 """
328
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700329 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000330 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400331 self._use_zip = isolated_format.is_namespace_with_compression(
332 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400333 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000334 self._cpu_thread_pool = None
335 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400336 self._aborted = False
337 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000338
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000339 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700340 def hash_algo(self):
341 """Hashing algorithm used to name files in storage based on their content.
342
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400343 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700344 """
345 return self._hash_algo
346
347 @property
348 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500349 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700350 return self._storage_api.location
351
352 @property
353 def namespace(self):
354 """Isolate namespace used by this storage.
355
356 Indirectly defines hashing scheme and compression method used.
357 """
358 return self._storage_api.namespace
359
360 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000361 def cpu_thread_pool(self):
362 """ThreadPool for CPU-bound tasks like zipping."""
363 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500364 threads = max(threading_utils.num_processors(), 2)
365 if sys.maxsize <= 2L**32:
366 # On 32 bits userland, do not try to use more than 16 threads.
367 threads = min(threads, 16)
368 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000369 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000370
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000371 @property
372 def net_thread_pool(self):
373 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
374 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700375 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000376 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000377
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000378 def close(self):
379 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400380 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000381 if self._cpu_thread_pool:
382 self._cpu_thread_pool.join()
383 self._cpu_thread_pool.close()
384 self._cpu_thread_pool = None
385 if self._net_thread_pool:
386 self._net_thread_pool.join()
387 self._net_thread_pool.close()
388 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400389 logging.info('Done.')
390
391 def abort(self):
392 """Cancels any pending or future operations."""
393 # This is not strictly theadsafe, but in the worst case the logging message
394 # will be printed twice. Not a big deal. In other places it is assumed that
395 # unprotected reads and writes to _aborted are serializable (it is true
396 # for python) and thus no locking is used.
397 if not self._aborted:
398 logging.warning('Aborting... It can take a while.')
399 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000400
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000401 def __enter__(self):
402 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400403 assert not self._prev_sig_handlers, self._prev_sig_handlers
404 for s in (signal.SIGINT, signal.SIGTERM):
405 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000406 return self
407
408 def __exit__(self, _exc_type, _exc_value, _traceback):
409 """Context manager interface."""
410 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400411 while self._prev_sig_handlers:
412 s, h = self._prev_sig_handlers.popitem()
413 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000414 return False
415
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000416 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800417 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000418
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800419 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000420
421 Arguments:
422 items: list of Item instances that represents data to upload.
423
424 Returns:
425 List of items that were uploaded. All other items are already there.
426 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700427 logging.info('upload_items(items=%d)', len(items))
428
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800429 # Ensure all digests are calculated.
430 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800432
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000433 # For each digest keep only first Item that matches it. All other items
434 # are just indistinguishable copies from the point of view of isolate
435 # server (it doesn't care about paths at all, only content and digests).
436 seen = {}
437 duplicates = 0
438 for item in items:
439 if seen.setdefault(item.digest, item) is not item:
440 duplicates += 1
441 items = seen.values()
442 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700443 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000444
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000445 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000446 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000447 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800448 channel = threading_utils.TaskChannel()
449 for missing_item, push_state in self.get_missing_items(items):
450 missing.add(missing_item)
451 self.async_push(channel, missing_item, push_state)
452
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000453 # No need to spawn deadlock detector thread if there's nothing to upload.
454 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700455 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000457 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000458 detector.ping()
459 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000460 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000461 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000462 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000463 logging.info('All files are uploaded')
464
465 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000466 total = len(items)
467 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000468 logging.info(
469 'Total: %6d, %9.1fkb',
470 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000471 total_size / 1024.)
472 cache_hit = set(items) - missing
473 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 logging.info(
475 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
476 len(cache_hit),
477 cache_hit_size / 1024.,
478 len(cache_hit) * 100. / total,
479 cache_hit_size * 100. / total_size if total_size else 0)
480 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000481 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 logging.info(
483 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
484 len(cache_miss),
485 cache_miss_size / 1024.,
486 len(cache_miss) * 100. / total,
487 cache_miss_size * 100. / total_size if total_size else 0)
488
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000489 return uploaded
490
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800491 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000492 """Starts asynchronous push to the server in a parallel thread.
493
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800494 Can be used only after |item| was checked for presence on a server with
495 'get_missing_items' call. 'get_missing_items' returns |push_state| object
496 that contains storage specific information describing how to upload
497 the item (for example in case of cloud storage, it is signed upload URLs).
498
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000499 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000500 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000501 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800502 push_state: push state returned by 'get_missing_items' call for |item|.
503
504 Returns:
505 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000506 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800507 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400508 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700509 threading_utils.PRIORITY_HIGH if item.high_priority
510 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000512 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400513 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400514 if self._aborted:
515 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700516 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800517 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000518 return item
519
520 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700521 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800522 self.net_thread_pool.add_task_with_channel(
523 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000524 return
525
526 # If zipping is enabled, zip in a separate thread.
527 def zip_and_push():
528 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
529 # content right here. It will block until all file is zipped.
530 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400531 if self._aborted:
532 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800533 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 data = ''.join(stream)
535 except Exception as exc:
536 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800537 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000539 self.net_thread_pool.add_task_with_channel(
540 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000541 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000542
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800543 def push(self, item, push_state):
544 """Synchronously pushes a single item to the server.
545
546 If you need to push many items at once, consider using 'upload_items' or
547 'async_push' with instance of TaskChannel.
548
549 Arguments:
550 item: item to upload as instance of Item class.
551 push_state: push state returned by 'get_missing_items' call for |item|.
552
553 Returns:
554 Pushed item (same object as |item|).
555 """
556 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700557 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800558 self.async_push(channel, item, push_state)
559 pushed = channel.pull()
560 assert pushed is item
561 return item
562
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000563 def async_fetch(self, channel, priority, digest, size, sink):
564 """Starts asynchronous fetch from the server in a parallel thread.
565
566 Arguments:
567 channel: TaskChannel that receives back |digest| when download ends.
568 priority: thread pool task priority for the fetch.
569 digest: hex digest of an item to download.
570 size: expected size of the item (after decompression).
571 sink: function that will be called as sink(generator).
572 """
573 def fetch():
574 try:
575 # Prepare reading pipeline.
576 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700577 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400578 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000579 # Run |stream| through verifier that will assert its size.
580 verifier = FetchStreamVerifier(stream, size)
581 # Verified stream goes to |sink|.
582 sink(verifier.run())
583 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800584 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000585 raise
586 return digest
587
588 # Don't bother with zip_thread_pool for decompression. Decompression is
589 # really fast and most probably IO bound anyway.
590 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
591
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000592 def get_missing_items(self, items):
593 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000595 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000596
597 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000598 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000599
600 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800601 For each missing item it yields a pair (item, push_state), where:
602 * item - Item object that is missing (one of |items|).
603 * push_state - opaque object that contains storage specific information
604 describing how to upload the item (for example in case of cloud
605 storage, it is signed upload URLs). It can later be passed to
606 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000607 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000608 channel = threading_utils.TaskChannel()
609 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800610
611 # Ensure all digests are calculated.
612 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700613 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400615 def contains(batch):
616 if self._aborted:
617 raise Aborted()
618 return self._storage_api.contains(batch)
619
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000620 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800621 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400622 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400623 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000624 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800625
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000626 # Yield results as they come in.
627 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800628 for missing_item, push_state in channel.pull().iteritems():
629 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000630
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000631
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800632def batch_items_for_check(items):
633 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000634
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635 Each batch corresponds to a single 'exists?' query to the server via a call
636 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000637
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800638 Arguments:
639 items: a list of Item objects.
640
641 Yields:
642 Batches of items to query for existence in a single operation,
643 each batch is a list of Item objects.
644 """
645 batch_count = 0
646 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
647 next_queries = []
648 for item in sorted(items, key=lambda x: x.size, reverse=True):
649 next_queries.append(item)
650 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800652 next_queries = []
653 batch_count += 1
654 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
655 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
656 if next_queries:
657 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000658
659
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000660class FetchQueue(object):
661 """Fetches items from Storage and places them into LocalCache.
662
663 It manages multiple concurrent fetch operations. Acts as a bridge between
664 Storage and LocalCache so that Storage and LocalCache don't depend on each
665 other at all.
666 """
667
668 def __init__(self, storage, cache):
669 self.storage = storage
670 self.cache = cache
671 self._channel = threading_utils.TaskChannel()
672 self._pending = set()
673 self._accessed = set()
674 self._fetched = cache.cached_set()
675
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400676 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700677 self,
678 digest,
679 size=UNKNOWN_FILE_SIZE,
680 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000681 """Starts asynchronous fetch of item |digest|."""
682 # Fetching it now?
683 if digest in self._pending:
684 return
685
686 # Mark this file as in use, verify_all_cached will later ensure it is still
687 # in cache.
688 self._accessed.add(digest)
689
690 # Already fetched? Notify cache to update item's LRU position.
691 if digest in self._fetched:
692 # 'touch' returns True if item is in cache and not corrupted.
693 if self.cache.touch(digest, size):
694 return
695 # Item is corrupted, remove it from cache and fetch it again.
696 self._fetched.remove(digest)
697 self.cache.evict(digest)
698
699 # TODO(maruel): It should look at the free disk space, the current cache
700 # size and the size of the new item on every new item:
701 # - Trim the cache as more entries are listed when free disk space is low,
702 # otherwise if the amount of data downloaded during the run > free disk
703 # space, it'll crash.
704 # - Make sure there's enough free disk space to fit all dependencies of
705 # this run! If not, abort early.
706
707 # Start fetching.
708 self._pending.add(digest)
709 self.storage.async_fetch(
710 self._channel, priority, digest, size,
711 functools.partial(self.cache.write, digest))
712
713 def wait(self, digests):
714 """Starts a loop that waits for at least one of |digests| to be retrieved.
715
716 Returns the first digest retrieved.
717 """
718 # Flush any already fetched items.
719 for digest in digests:
720 if digest in self._fetched:
721 return digest
722
723 # Ensure all requested items are being fetched now.
724 assert all(digest in self._pending for digest in digests), (
725 digests, self._pending)
726
727 # Wait for some requested item to finish fetching.
728 while self._pending:
729 digest = self._channel.pull()
730 self._pending.remove(digest)
731 self._fetched.add(digest)
732 if digest in digests:
733 return digest
734
735 # Should never reach this point due to assert above.
736 raise RuntimeError('Impossible state')
737
738 def inject_local_file(self, path, algo):
739 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700740 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741 data = f.read()
742 digest = algo(data).hexdigest()
743 self.cache.write(digest, [data])
744 self._fetched.add(digest)
745 return digest
746
747 @property
748 def pending_count(self):
749 """Returns number of items to be fetched."""
750 return len(self._pending)
751
752 def verify_all_cached(self):
753 """True if all accessed items are in cache."""
754 return self._accessed.issubset(self.cache.cached_set())
755
756
757class FetchStreamVerifier(object):
758 """Verifies that fetched file is valid before passing it to the LocalCache."""
759
760 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400761 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000762 self.stream = stream
763 self.expected_size = expected_size
764 self.current_size = 0
765
766 def run(self):
767 """Generator that yields same items as |stream|.
768
769 Verifies |stream| is complete before yielding a last chunk to consumer.
770
771 Also wraps IOError produced by consumer into MappingError exceptions since
772 otherwise Storage will retry fetch on unrelated local cache errors.
773 """
774 # Read one chunk ahead, keep it in |stored|.
775 # That way a complete stream can be verified before pushing last chunk
776 # to consumer.
777 stored = None
778 for chunk in self.stream:
779 assert chunk is not None
780 if stored is not None:
781 self._inspect_chunk(stored, is_last=False)
782 try:
783 yield stored
784 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400785 raise isolated_format.MappingError(
786 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000787 stored = chunk
788 if stored is not None:
789 self._inspect_chunk(stored, is_last=True)
790 try:
791 yield stored
792 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400793 raise isolated_format.MappingError(
794 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000795
796 def _inspect_chunk(self, chunk, is_last):
797 """Called for each fetched chunk before passing it to consumer."""
798 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400799 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700800 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000801 (self.expected_size != self.current_size)):
802 raise IOError('Incorrect file size: expected %d, got %d' % (
803 self.expected_size, self.current_size))
804
805
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000806class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800807 """Interface for classes that implement low-level storage operations.
808
809 StorageApi is oblivious of compression and hashing scheme used. This details
810 are handled in higher level Storage class.
811
812 Clients should generally not use StorageApi directly. Storage class is
813 preferred since it implements compression and upload optimizations.
814 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000815
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700816 @property
817 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500818 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700819 raise NotImplementedError()
820
821 @property
822 def namespace(self):
823 """Isolate namespace used by this storage.
824
825 Indirectly defines hashing scheme and compression method used.
826 """
827 raise NotImplementedError()
828
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800829 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000830 """Fetches an object and yields its content.
831
832 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000833 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800834 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000835
836 Yields:
837 Chunks of downloaded item (as str objects).
838 """
839 raise NotImplementedError()
840
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800841 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000842 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000843
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800844 |item| MUST go through 'contains' call to get |push_state| before it can
845 be pushed to the storage.
846
847 To be clear, here is one possible usage:
848 all_items = [... all items to push as Item subclasses ...]
849 for missing_item, push_state in storage_api.contains(all_items).items():
850 storage_api.push(missing_item, push_state)
851
852 When pushing to a namespace with compression, data that should be pushed
853 and data provided by the item is not the same. In that case |content| is
854 not None and it yields chunks of compressed data (using item.content() as
855 a source of original uncompressed data). This is implemented by Storage
856 class.
857
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000858 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000859 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800860 push_state: push state object as returned by 'contains' call.
861 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000862
863 Returns:
864 None.
865 """
866 raise NotImplementedError()
867
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000868 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800869 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000870
871 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800872 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000873
874 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875 A dict missing Item -> opaque push state object to be passed to 'push'.
876 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000877 """
878 raise NotImplementedError()
879
880
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800881class _IsolateServerPushState(object):
882 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500883
884 Note this needs to be a global class to support pickling.
885 """
886
Cory Massarocc19c8c2015-03-10 13:35:11 -0700887 def __init__(self, preupload_status, size):
888 self.preupload_status = preupload_status
889 gs_upload_url = preupload_status.get('gs_upload_url') or None
890 if gs_upload_url:
891 self.upload_url = gs_upload_url
892 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
893 else:
894 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
895 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500896 self.uploaded = False
897 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500898 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500899
900
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000901class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000902 """StorageApi implementation that downloads and uploads to Isolate Server.
903
904 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800905 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000906 """
907
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000908 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000909 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500910 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700911 self._base_url = base_url.rstrip('/')
912 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700913 self._namespace_dict = {
914 'compression': 'flate' if namespace.endswith(
915 ('-gzip', '-flate')) else '',
916 'digest_hash': 'sha-1',
917 'namespace': namespace,
918 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000919 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000920 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500921 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000922
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000923 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000924 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700925 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000926
927 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700928 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000929 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000930 # TODO(maruel): Make this request much earlier asynchronously while the
931 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800932
933 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
934 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700935
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000936 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000937 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700938 self._server_caps = net.url_read_json(
939 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
940 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000941 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000942
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700943 @property
944 def location(self):
945 return self._base_url
946
947 @property
948 def namespace(self):
949 return self._namespace
950
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800951 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700952 assert offset >= 0
953 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
954 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800955 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700956 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000957
Cory Massarocc19c8c2015-03-10 13:35:11 -0700958 if not response:
maruele154f9c2015-09-14 11:03:15 -0700959 raise IOError(
960 'Attempted to fetch from %s; no data exist: %s / %s.' % (
961 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800962
Cory Massarocc19c8c2015-03-10 13:35:11 -0700963 # for DB uploads
964 content = response.get('content')
965 if content is not None:
maruel863ac262016-03-17 11:00:37 -0700966 yield base64.b64decode(content)
967 return
Cory Massarocc19c8c2015-03-10 13:35:11 -0700968
969 # for GS entities
970 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -0700971 if not connection:
972 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -0700973
974 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800975 if offset:
976 content_range = connection.get_header('Content-Range')
977 if not content_range:
978 raise IOError('Missing Content-Range header')
979
980 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
981 # According to a spec, <size> can be '*' meaning "Total size of the file
982 # is not known in advance".
983 try:
984 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
985 if not match:
986 raise ValueError()
987 content_offset = int(match.group(1))
988 last_byte_index = int(match.group(2))
989 size = None if match.group(3) == '*' else int(match.group(3))
990 except ValueError:
991 raise IOError('Invalid Content-Range header: %s' % content_range)
992
993 # Ensure returned offset equals requested one.
994 if offset != content_offset:
995 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
996 offset, content_offset, content_range))
997
998 # Ensure entire tail of the file is returned.
999 if size is not None and last_byte_index + 1 != size:
1000 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1001
maruel863ac262016-03-17 11:00:37 -07001002 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1003 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001004
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001005 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001006 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001007 assert item.digest is not None
1008 assert item.size is not None
1009 assert isinstance(push_state, _IsolateServerPushState)
1010 assert not push_state.finalized
1011
1012 # Default to item.content().
1013 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001014 logging.info('Push state size: %d', push_state.size)
1015 if isinstance(content, (basestring, list)):
1016 # Memory is already used, too late.
1017 with self._lock:
1018 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001019 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001020 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1021 # If |content| is indeed a generator, it can not be re-winded back to the
1022 # beginning of the stream. A retry will find it exhausted. A possible
1023 # solution is to wrap |content| generator with some sort of caching
1024 # restartable generator. It should be done alongside streaming support
1025 # implementation.
1026 #
1027 # In theory, we should keep the generator, so that it is not serialized in
1028 # memory. Sadly net.HttpService.request() requires the body to be
1029 # serialized.
1030 assert isinstance(content, types.GeneratorType), repr(content)
1031 slept = False
1032 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001033 # One byte less than 512mb. This is to cope with incompressible content.
1034 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001035 while True:
1036 with self._lock:
1037 # This is due to 32 bits python when uploading very large files. The
1038 # problem is that it's comparing uncompressed sizes, while we care
1039 # about compressed sizes since it's what is serialized in memory.
1040 # The first check assumes large files are compressible and that by
1041 # throttling one upload at once, we can survive. Otherwise, kaboom.
1042 memory_use = self._memory_use
1043 if ((push_state.size >= max_size and not memory_use) or
1044 (memory_use + push_state.size <= max_size)):
1045 self._memory_use += push_state.size
1046 memory_use = self._memory_use
1047 break
1048 time.sleep(0.1)
1049 slept = True
1050 if slept:
1051 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001052
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001053 try:
1054 # This push operation may be a retry after failed finalization call below,
1055 # no need to reupload contents in that case.
1056 if not push_state.uploaded:
1057 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001058 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001059 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001060 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001061 item.digest, push_state.upload_url))
1062 push_state.uploaded = True
1063 else:
1064 logging.info(
1065 'A file %s already uploaded, retrying finalization only',
1066 item.digest)
1067
1068 # Optionally notify the server that it's done.
1069 if push_state.finalize_url:
1070 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1071 # send it to isolated server. That way isolate server can verify that
1072 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1073 # stored files).
1074 # TODO(maruel): Fix the server to accept properly data={} so
1075 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001076 response = net.url_read_json(
1077 url='%s/%s' % (self._base_url, push_state.finalize_url),
1078 data={
1079 'upload_ticket': push_state.preupload_status['upload_ticket'],
1080 })
1081 if not response or not response['ok']:
1082 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001083 push_state.finalized = True
1084 finally:
1085 with self._lock:
1086 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001087
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001088 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001089 # Ensure all items were initialized with 'prepare' call. Storage does that.
1090 assert all(i.digest is not None and i.size is not None for i in items)
1091
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001092 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001093 body = {
1094 'items': [
1095 {
1096 'digest': item.digest,
1097 'is_isolated': bool(item.high_priority),
1098 'size': item.size,
1099 } for item in items
1100 ],
1101 'namespace': self._namespace_dict,
1102 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001103
Cory Massarocc19c8c2015-03-10 13:35:11 -07001104 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001105
1106 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001107 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001108 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001109 response = net.url_read_json(url=query_url, data=body)
1110 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001111 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001112 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001113 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001114 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001115 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001116
1117 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001118 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001119 for preupload_status in response.get('items', []):
1120 assert 'upload_ticket' in preupload_status, (
1121 preupload_status, '/preupload did not generate an upload ticket')
1122 index = int(preupload_status['index'])
1123 missing_items[items[index]] = _IsolateServerPushState(
1124 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001125 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001126 len(items), len(items) - len(missing_items))
1127 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001128
Cory Massarocc19c8c2015-03-10 13:35:11 -07001129 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001130 """Fetches isolated data from the URL.
1131
1132 Used only for fetching files, not for API calls. Can be overridden in
1133 subclasses.
1134
1135 Args:
1136 url: URL to fetch the data from, can possibly return http redirect.
1137 offset: byte offset inside the file to start fetching from.
1138
1139 Returns:
1140 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1141 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001142 assert isinstance(offset, int)
1143 data = {
1144 'digest': digest.encode('utf-8'),
1145 'namespace': self._namespace_dict,
1146 'offset': offset,
1147 }
maruel0c25f4f2015-12-15 05:41:17 -08001148 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1149 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001150 return net.url_read_json(
1151 url=url,
1152 data=data,
1153 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001154
Cory Massarocc19c8c2015-03-10 13:35:11 -07001155 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001156 """Uploads isolated file to the URL.
1157
1158 Used only for storing files, not for API calls. Can be overridden in
1159 subclasses.
1160
1161 Args:
1162 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001163 push_state: an _IsolateServicePushState instance
1164 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001165 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001166 """
1167 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1168 # upload support is implemented.
1169 if isinstance(content, list) and len(content) == 1:
1170 content = content[0]
1171 else:
1172 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001173
1174 # DB upload
1175 if not push_state.finalize_url:
1176 url = '%s/%s' % (self._base_url, push_state.upload_url)
1177 content = base64.b64encode(content)
1178 data = {
1179 'upload_ticket': push_state.preupload_status['upload_ticket'],
1180 'content': content,
1181 }
1182 response = net.url_read_json(url=url, data=data)
1183 return response is not None and response['ok']
1184
1185 # upload to GS
1186 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001187 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001188 content_type='application/octet-stream',
1189 data=content,
1190 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001191 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001192 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001193 return response is not None
1194
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001195
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001196class LocalCache(object):
1197 """Local cache that stores objects fetched via Storage.
1198
1199 It can be accessed concurrently from multiple threads, so it should protect
1200 its internal state with some lock.
1201 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001202 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001203
maruel064c0a32016-04-05 11:47:15 -07001204 def __init__(self):
1205 self._lock = threading_utils.LockWithAssert()
1206 # Profiling values.
1207 self._added = []
1208 self._initial_number_items = 0
1209 self._initial_size = 0
1210 self._evicted = []
1211 self._linked = []
1212
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001213 def __enter__(self):
1214 """Context manager interface."""
1215 return self
1216
1217 def __exit__(self, _exc_type, _exec_value, _traceback):
1218 """Context manager interface."""
1219 return False
1220
maruel064c0a32016-04-05 11:47:15 -07001221 @property
1222 def added(self):
1223 return self._added[:]
1224
1225 @property
1226 def evicted(self):
1227 return self._evicted[:]
1228
1229 @property
1230 def initial_number_items(self):
1231 return self._initial_number_items
1232
1233 @property
1234 def initial_size(self):
1235 return self._initial_size
1236
1237 @property
1238 def linked(self):
1239 return self._linked[:]
1240
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001241 def cached_set(self):
1242 """Returns a set of all cached digests (always a new object)."""
1243 raise NotImplementedError()
1244
maruel36a963d2016-04-08 17:15:49 -07001245 def cleanup(self):
1246 """Deletes any corrupted item from the cache and trims it if necessary."""
1247 raise NotImplementedError()
1248
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001249 def touch(self, digest, size):
1250 """Ensures item is not corrupted and updates its LRU position.
1251
1252 Arguments:
1253 digest: hash digest of item to check.
1254 size: expected size of this item.
1255
1256 Returns:
1257 True if item is in cache and not corrupted.
1258 """
1259 raise NotImplementedError()
1260
1261 def evict(self, digest):
1262 """Removes item from cache if it's there."""
1263 raise NotImplementedError()
1264
1265 def read(self, digest):
1266 """Returns contents of the cached item as a single str."""
1267 raise NotImplementedError()
1268
1269 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001270 """Reads data from |content| generator and stores it in cache.
1271
1272 Returns digest to simplify chaining.
1273 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001274 raise NotImplementedError()
1275
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001276 def hardlink(self, digest, dest, file_mode):
1277 """Ensures file at |dest| has same content as cached |digest|.
1278
1279 If file_mode is provided, it is used to set the executable bit if
1280 applicable.
1281 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001282 raise NotImplementedError()
1283
1284
1285class MemoryCache(LocalCache):
1286 """LocalCache implementation that stores everything in memory."""
1287
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001288 def __init__(self, file_mode_mask=0500):
1289 """Args:
1290 file_mode_mask: bit mask to AND file mode with. Default value will make
1291 all mapped files to be read only.
1292 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001293 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001294 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001295 self._contents = {}
1296
1297 def cached_set(self):
1298 with self._lock:
1299 return set(self._contents)
1300
maruel36a963d2016-04-08 17:15:49 -07001301 def cleanup(self):
1302 pass
1303
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001304 def touch(self, digest, size):
1305 with self._lock:
1306 return digest in self._contents
1307
1308 def evict(self, digest):
1309 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001310 v = self._contents.pop(digest, None)
1311 if v is not None:
1312 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001313
1314 def read(self, digest):
1315 with self._lock:
1316 return self._contents[digest]
1317
1318 def write(self, digest, content):
1319 # Assemble whole stream before taking the lock.
1320 data = ''.join(content)
1321 with self._lock:
1322 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001323 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001324 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001325
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001326 def hardlink(self, digest, dest, file_mode):
1327 """Since data is kept in memory, there is no filenode to hardlink."""
maruel064c0a32016-04-05 11:47:15 -07001328 data = self.read(digest)
1329 file_write(dest, [data])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001330 if file_mode is not None:
maruel12e30012015-10-09 11:55:35 -07001331 fs.chmod(dest, file_mode & self._file_mode_mask)
maruel064c0a32016-04-05 11:47:15 -07001332 with self._lock:
1333 self._linked.append(len(data))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001334
1335
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001336class CachePolicies(object):
1337 def __init__(self, max_cache_size, min_free_space, max_items):
1338 """
1339 Arguments:
1340 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1341 cache is effectively a leak.
1342 - min_free_space: Trim if disk free space becomes lower than this value. If
1343 0, it unconditionally fill the disk.
1344 - max_items: Maximum number of items to keep in the cache. If 0, do not
1345 enforce a limit.
1346 """
1347 self.max_cache_size = max_cache_size
1348 self.min_free_space = min_free_space
1349 self.max_items = max_items
1350
1351
1352class DiskCache(LocalCache):
1353 """Stateful LRU cache in a flat hash table in a directory.
1354
1355 Saves its state as json file.
1356 """
maruel12e30012015-10-09 11:55:35 -07001357 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001358
1359 def __init__(self, cache_dir, policies, hash_algo):
1360 """
1361 Arguments:
1362 cache_dir: directory where to place the cache.
1363 policies: cache retention policies.
1364 algo: hashing algorithm used.
1365 """
maruel064c0a32016-04-05 11:47:15 -07001366 # All protected methods (starting with '_') except _path should be called
1367 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001368 super(DiskCache, self).__init__()
1369 self.cache_dir = cache_dir
1370 self.policies = policies
1371 self.hash_algo = hash_algo
1372 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001373 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001374 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001375 # Current cached free disk space. It is updated by self._trim().
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001376 self._free_disk = 0
maruel083fa552016-04-08 14:38:01 -07001377 # The items that must not be evicted during this run since they were
1378 # referenced.
1379 self._protected = set()
maruel36a963d2016-04-08 17:15:49 -07001380 # Cleanup operations done by self._load(), if any.
1381 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001382 with tools.Profiler('Setup'):
1383 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001384 # self._load() calls self._trim() which initializes self._free_disk.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001385 self._load()
1386
1387 def __enter__(self):
1388 return self
1389
1390 def __exit__(self, _exc_type, _exec_value, _traceback):
1391 with tools.Profiler('CleanupTrimming'):
1392 with self._lock:
1393 self._trim()
1394
1395 logging.info(
1396 '%5d (%8dkb) added',
1397 len(self._added), sum(self._added) / 1024)
1398 logging.info(
1399 '%5d (%8dkb) current',
1400 len(self._lru),
1401 sum(self._lru.itervalues()) / 1024)
1402 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001403 '%5d (%8dkb) evicted',
1404 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001405 logging.info(
1406 ' %8dkb free',
1407 self._free_disk / 1024)
1408 return False
1409
1410 def cached_set(self):
1411 with self._lock:
1412 return self._lru.keys_set()
1413
maruel36a963d2016-04-08 17:15:49 -07001414 def cleanup(self):
1415 # At that point, the cache was already loaded, trimmed to respect cache
1416 # policies and invalid files were deleted.
1417 if self._evicted:
1418 logging.info(
1419 'Evicted items with the following sizes: %s', sorted(self._evicted))
1420
1421 # What remains to be done is to hash every single item to
1422 # detect corruption, then save to ensure state.json is up to date.
1423 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1424 # TODO(maruel): Let's revisit once directory metadata is stored in
1425 # state.json so only the files that had been mapped since the last cleanup()
1426 # call are manually verified.
1427 #
1428 #with self._lock:
1429 # for digest in self._lru:
1430 # if not isolated_format.is_valid_hash(
1431 # self._path(digest), self.hash_algo):
1432 # self.evict(digest)
1433 # logging.info('Deleted corrupted item: %s', digest)
1434
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001435 def touch(self, digest, size):
1436 """Verifies an actual file is valid.
1437
1438 Note that is doesn't compute the hash so it could still be corrupted if the
1439 file size didn't change.
1440
1441 TODO(maruel): More stringent verification while keeping the check fast.
1442 """
1443 # Do the check outside the lock.
1444 if not is_valid_file(self._path(digest), size):
1445 return False
1446
1447 # Update it's LRU position.
1448 with self._lock:
1449 if digest not in self._lru:
1450 return False
1451 self._lru.touch(digest)
maruel083fa552016-04-08 14:38:01 -07001452 self._protected.add(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001453 return True
1454
1455 def evict(self, digest):
1456 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001457 # Do not check for 'digest in self._protected' since it could be because
1458 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001459 self._lru.pop(digest)
1460 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1461
1462 def read(self, digest):
maruel12e30012015-10-09 11:55:35 -07001463 with fs.open(self._path(digest), 'rb') as f:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001464 return f.read()
1465
1466 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001467 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001468 with self._lock:
1469 self._protected.add(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001470 path = self._path(digest)
1471 # A stale broken file may remain. It is possible for the file to have write
1472 # access bit removed which would cause the file_write() call to fail to open
1473 # in write mode. Take no chance here.
1474 file_path.try_remove(path)
1475 try:
1476 size = file_write(path, content)
1477 except:
1478 # There are two possible places were an exception can occur:
1479 # 1) Inside |content| generator in case of network or unzipping errors.
1480 # 2) Inside file_write itself in case of disk IO errors.
1481 # In any case delete an incomplete file and propagate the exception to
1482 # caller, it will be logged there.
1483 file_path.try_remove(path)
1484 raise
1485 # Make the file read-only in the cache. This has a few side-effects since
1486 # the file node is modified, so every directory entries to this file becomes
1487 # read-only. It's fine here because it is a new file.
1488 file_path.set_read_only(path, True)
1489 with self._lock:
1490 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001491 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001492
1493 def hardlink(self, digest, dest, file_mode):
1494 """Hardlinks the file to |dest|.
1495
1496 Note that the file permission bits are on the file node, not the directory
1497 entry, so changing the access bit on any of the directory entries for the
1498 file node will affect them all.
1499 """
1500 path = self._path(digest)
maruel1f7e8162015-09-16 10:35:43 -07001501 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1502 # Report to the server that it failed with more details. We'll want to
1503 # squash them all.
1504 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1505
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001506 if file_mode is not None:
1507 # Ignores all other bits.
maruel12e30012015-10-09 11:55:35 -07001508 fs.chmod(dest, file_mode & 0500)
maruel064c0a32016-04-05 11:47:15 -07001509 with self._lock:
1510 self._linked.append(self._lru[digest])
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001511
1512 def _load(self):
1513 """Loads state of the cache from json file."""
1514 self._lock.assert_locked()
1515
1516 if not os.path.isdir(self.cache_dir):
maruel12e30012015-10-09 11:55:35 -07001517 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001518 else:
1519 # Make sure the cache is read-only.
1520 # TODO(maruel): Calculate the cost and optimize the performance
1521 # accordingly.
1522 file_path.make_tree_read_only(self.cache_dir)
1523
1524 # Load state of the cache.
maruel12e30012015-10-09 11:55:35 -07001525 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001526 try:
1527 self._lru = lru.LRUDict.load(self.state_file)
1528 except ValueError as err:
1529 logging.error('Failed to load cache state: %s' % (err,))
1530 # Don't want to keep broken state file.
1531 file_path.try_remove(self.state_file)
1532
1533 # Ensure that all files listed in the state still exist and add new ones.
1534 previous = self._lru.keys_set()
1535 unknown = []
maruel12e30012015-10-09 11:55:35 -07001536 for filename in fs.listdir(self.cache_dir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001537 if filename == self.STATE_FILE:
1538 continue
1539 if filename in previous:
maruel064c0a32016-04-05 11:47:15 -07001540 self._initial_size += self._lru[filename]
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001541 previous.remove(filename)
maruel064c0a32016-04-05 11:47:15 -07001542 self._initial_number_items += 1
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001543 continue
1544 # An untracked file.
1545 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1546 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001547 p = self._path(filename)
maruel12e30012015-10-09 11:55:35 -07001548 if fs.isdir(p):
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001549 try:
1550 file_path.rmtree(p)
1551 except OSError:
1552 pass
1553 else:
1554 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001555 continue
1556 # File that's not referenced in 'state.json'.
1557 # TODO(vadimsh): Verify its SHA1 matches file name.
1558 logging.warning('Adding unknown file %s to cache', filename)
1559 unknown.append(filename)
1560
1561 if unknown:
1562 # Add as oldest files. They will be deleted eventually if not accessed.
maruel064c0a32016-04-05 11:47:15 -07001563 pairs = []
1564 for digest in unknown:
1565 size = fs.stat(self._path(digest)).st_size
1566 self._initial_size += size
1567 self._initial_number_items += 1
1568 pairs.append((digest, size))
1569 self._lru.batch_insert_oldest(pairs)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001570 logging.warning('Added back %d unknown files', len(unknown))
1571
1572 if previous:
1573 # Filter out entries that were not found.
1574 logging.warning('Removed %d lost files', len(previous))
1575 for filename in previous:
1576 self._lru.pop(filename)
1577 self._trim()
1578
1579 def _save(self):
1580 """Saves the LRU ordering."""
1581 self._lock.assert_locked()
1582 if sys.platform != 'win32':
1583 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001584 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001585 # Necessary otherwise the file can't be created.
1586 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001587 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001588 file_path.set_read_only(self.state_file, False)
1589 self._lru.save(self.state_file)
1590
1591 def _trim(self):
1592 """Trims anything we don't know, make sure enough free space exists."""
1593 self._lock.assert_locked()
1594
1595 # Ensure maximum cache size.
1596 if self.policies.max_cache_size:
1597 total_size = sum(self._lru.itervalues())
1598 while total_size > self.policies.max_cache_size:
1599 total_size -= self._remove_lru_file()
1600
1601 # Ensure maximum number of items in the cache.
1602 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1603 for _ in xrange(len(self._lru) - self.policies.max_items):
1604 self._remove_lru_file()
1605
1606 # Ensure enough free space.
1607 self._free_disk = file_path.get_free_space(self.cache_dir)
1608 trimmed_due_to_space = False
1609 while (
1610 self.policies.min_free_space and
1611 self._lru and
1612 self._free_disk < self.policies.min_free_space):
1613 trimmed_due_to_space = True
1614 self._remove_lru_file()
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001615 if trimmed_due_to_space:
1616 total_usage = sum(self._lru.itervalues())
1617 usage_percent = 0.
1618 if total_usage:
1619 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1620 logging.warning(
1621 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1622 'cache (%.1f%% of its maximum capacity)',
1623 self._free_disk / 1024.,
1624 total_usage / 1024.,
1625 usage_percent)
1626 self._save()
1627
1628 def _path(self, digest):
1629 """Returns the path to one item."""
1630 return os.path.join(self.cache_dir, digest)
1631
1632 def _remove_lru_file(self):
1633 """Removes the last recently used file and returns its size."""
1634 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001635 try:
1636 digest, size = self._lru.get_oldest()
1637 except KeyError:
1638 raise Error('Nothing to remove')
1639 if digest in self._protected:
1640 raise Error('Not enough space to map the whole isolated tree')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001641 digest, size = self._lru.pop_oldest()
1642 self._delete_file(digest, size)
1643 return size
1644
1645 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1646 """Adds an item into LRU cache marking it as a newest one."""
1647 self._lock.assert_locked()
1648 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001649 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001650 self._added.append(size)
1651 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001652 self._free_disk -= size
1653 # Do a quicker version of self._trim(). It only enforces free disk space,
1654 # not cache size limits. It doesn't actually look at real free disk space,
1655 # only uses its cache values. self._trim() will be called later to enforce
1656 # real trimming but doing this quick version here makes it possible to map
1657 # an isolated that is larger than the current amount of free disk space when
1658 # the cache size is already large.
1659 while (
1660 self.policies.min_free_space and
1661 self._lru and
1662 self._free_disk < self.policies.min_free_space):
1663 self._remove_lru_file()
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001664
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001665 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1666 """Deletes cache file from the file system."""
1667 self._lock.assert_locked()
1668 try:
1669 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001670 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001671 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001672 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001673 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001674 except OSError as e:
1675 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1676
1677
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001678class IsolatedBundle(object):
1679 """Fetched and parsed .isolated file with all dependencies."""
1680
Vadim Shtayura3148e072014-09-02 18:51:52 -07001681 def __init__(self):
1682 self.command = []
1683 self.files = {}
1684 self.read_only = None
1685 self.relative_cwd = None
1686 # The main .isolated file, a IsolatedFile instance.
1687 self.root = None
1688
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001689 def fetch(self, fetch_queue, root_isolated_hash, algo):
1690 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001691
1692 It enables support for "included" .isolated files. They are processed in
1693 strict order but fetched asynchronously from the cache. This is important so
1694 that a file in an included .isolated file that is overridden by an embedding
1695 .isolated file is not fetched needlessly. The includes are fetched in one
1696 pass and the files are fetched as soon as all the ones on the left-side
1697 of the tree were fetched.
1698
1699 The prioritization is very important here for nested .isolated files.
1700 'includes' have the highest priority and the algorithm is optimized for both
1701 deep and wide trees. A deep one is a long link of .isolated files referenced
1702 one at a time by one item in 'includes'. A wide one has a large number of
1703 'includes' in a single .isolated file. 'left' is defined as an included
1704 .isolated file earlier in the 'includes' list. So the order of the elements
1705 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001706
1707 As a side effect this method starts asynchronous fetch of all data files
1708 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1709 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001710 """
1711 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1712
1713 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1714 pending = {}
1715 # Set of hashes of already retrieved items to refuse recursive includes.
1716 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001717 # Set of IsolatedFile's whose data files have already being fetched.
1718 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001719
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001720 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001721 h = isolated_file.obj_hash
1722 if h in seen:
1723 raise isolated_format.IsolatedError(
1724 'IsolatedFile %s is retrieved recursively' % h)
1725 assert h not in pending
1726 seen.add(h)
1727 pending[h] = isolated_file
1728 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1729
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001730 # Start fetching root *.isolated file (single file, not the whole bundle).
1731 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001732
1733 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001734 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001735 item_hash = fetch_queue.wait(pending)
1736 item = pending.pop(item_hash)
1737 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001738
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001739 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001740 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001741 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001742
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001743 # Always fetch *.isolated files in traversal order, waiting if necessary
1744 # until next to-be-processed node loads. "Waiting" is done by yielding
1745 # back to the outer loop, that waits until some *.isolated is loaded.
1746 for node in isolated_format.walk_includes(self.root):
1747 if node not in processed:
1748 # Not visited, and not yet loaded -> wait for it to load.
1749 if not node.is_loaded:
1750 break
1751 # Not visited and loaded -> process it and continue the traversal.
1752 self._start_fetching_files(node, fetch_queue)
1753 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001754
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001755 # All *.isolated files should be processed by now and only them.
1756 all_isolateds = set(isolated_format.walk_includes(self.root))
1757 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001758
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001759 # Extract 'command' and other bundle properties.
1760 for node in isolated_format.walk_includes(self.root):
1761 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001762 self.relative_cwd = self.relative_cwd or ''
1763
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001764 def _start_fetching_files(self, isolated, fetch_queue):
1765 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001766
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001767 Modifies self.files.
1768 """
1769 logging.debug('fetch_files(%s)', isolated.obj_hash)
1770 for filepath, properties in isolated.data.get('files', {}).iteritems():
1771 # Root isolated has priority on the files being mapped. In particular,
1772 # overridden files must not be fetched.
1773 if filepath not in self.files:
1774 self.files[filepath] = properties
1775 if 'h' in properties:
1776 # Preemptively request files.
1777 logging.debug('fetching %s', filepath)
1778 fetch_queue.add(
1779 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1780
1781 def _update_self(self, node):
1782 """Extracts bundle global parameters from loaded *.isolated file.
1783
1784 Will be called with each loaded *.isolated file in order of traversal of
1785 isolated include graph (see isolated_format.walk_includes).
1786 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001787 # Grabs properties.
1788 if not self.command and node.data.get('command'):
1789 # Ensure paths are correctly separated on windows.
1790 self.command = node.data['command']
1791 if self.command:
1792 self.command[0] = self.command[0].replace('/', os.path.sep)
1793 self.command = tools.fix_python_path(self.command)
1794 if self.read_only is None and node.data.get('read_only') is not None:
1795 self.read_only = node.data['read_only']
1796 if (self.relative_cwd is None and
1797 node.data.get('relative_cwd') is not None):
1798 self.relative_cwd = node.data['relative_cwd']
1799
1800
Vadim Shtayura8623c272014-12-01 11:45:27 -08001801def set_storage_api_class(cls):
1802 """Replaces StorageApi implementation used by default."""
1803 global _storage_api_cls
1804 assert _storage_api_cls is None
1805 assert issubclass(cls, StorageApi)
1806 _storage_api_cls = cls
1807
1808
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001809def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001810 """Returns an object that implements low-level StorageApi interface.
1811
1812 It is used by Storage to work with single isolate |namespace|. It should
1813 rarely be used directly by clients, see 'get_storage' for
1814 a better alternative.
1815
1816 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001817 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001818 namespace: isolate namespace to operate in, also defines hashing and
1819 compression scheme used, i.e. namespace names that end with '-gzip'
1820 store compressed data.
1821
1822 Returns:
1823 Instance of StorageApi subclass.
1824 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001825 cls = _storage_api_cls or IsolateServer
1826 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001827
1828
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001829def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001830 """Returns Storage class that can upload and download from |namespace|.
1831
1832 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001833 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001834 namespace: isolate namespace to operate in, also defines hashing and
1835 compression scheme used, i.e. namespace names that end with '-gzip'
1836 store compressed data.
1837
1838 Returns:
1839 Instance of Storage.
1840 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001841 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001842
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001843
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001844def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001845 """Uploads the given tree to the given url.
1846
1847 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001848 base_url: The url of the isolate server to upload to.
1849 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001850 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001851 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001852 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001853 # Filter out symlinks, since they are not represented by items on isolate
1854 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001855 items = []
1856 seen = set()
1857 skipped = 0
1858 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001859 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001860 if 'l' not in metadata and filepath not in seen:
1861 seen.add(filepath)
1862 item = FileItem(
1863 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001864 digest=metadata['h'],
1865 size=metadata['s'],
1866 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001867 items.append(item)
1868 else:
1869 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001870
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001871 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001872 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001873 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001874
1875
maruelb8d88d12016-04-08 12:54:01 -07001876def fetch_isolated(isolated_hash, storage, cache, outdir):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001877 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001878
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001879 Arguments:
1880 isolated_hash: hash of the root *.isolated file.
1881 storage: Storage class that communicates with isolate storage.
1882 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001883 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001884
1885 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001886 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001887 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001888 logging.debug(
maruelb8d88d12016-04-08 12:54:01 -07001889 'fetch_isolated(%s, %s, %s, %s)', isolated_hash, storage, cache, outdir)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001890 # Hash algorithm to use, defined by namespace |storage| is using.
1891 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001892 with cache:
1893 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001894 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001895
1896 with tools.Profiler('GetIsolateds'):
1897 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001898 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001899 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001900 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001901 try:
maruel1ceb3872015-10-14 06:10:44 -07001902 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001903 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001904 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001905 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1906 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001907
1908 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001909 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001910
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001911 with tools.Profiler('GetRest'):
1912 # Create file system hierarchy.
maruel12e30012015-10-09 11:55:35 -07001913 if not fs.isdir(outdir):
1914 fs.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001915 create_directories(outdir, bundle.files)
1916 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001917
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001918 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001919 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
maruel12e30012015-10-09 11:55:35 -07001920 if not fs.isdir(cwd):
1921 fs.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001922
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001923 # Multimap: digest -> list of pairs (path, props).
1924 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001925 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001926 if 'h' in props:
1927 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001928
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001929 # Now block on the remaining files to be downloaded and mapped.
1930 logging.info('Retrieving remaining files (%d of them)...',
1931 fetch_queue.pending_count)
1932 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001933 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001934 while remaining:
1935 detector.ping()
1936
1937 # Wait for any item to finish fetching to cache.
1938 digest = fetch_queue.wait(remaining)
1939
1940 # Link corresponding files to a fetched item in cache.
1941 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001942 cache.hardlink(
1943 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001944
1945 # Report progress.
1946 duration = time.time() - last_update
1947 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1948 msg = '%d files remaining...' % len(remaining)
1949 print msg
1950 logging.info(msg)
1951 last_update = time.time()
1952
1953 # Cache could evict some items we just tried to fetch, it's a fatal error.
1954 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001955 raise isolated_format.MappingError(
1956 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001957 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001958
1959
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001960def directory_to_metadata(root, algo, blacklist):
1961 """Returns the FileItem list and .isolated metadata for a directory."""
1962 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001963 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001964 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001965 metadata = {
1966 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001967 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001968 for relpath in paths
1969 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001970 for v in metadata.itervalues():
1971 v.pop('t')
1972 items = [
1973 FileItem(
1974 path=os.path.join(root, relpath),
1975 digest=meta['h'],
1976 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001977 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001978 for relpath, meta in metadata.iteritems() if 'h' in meta
1979 ]
1980 return items, metadata
1981
1982
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001983def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001984 """Stores every entries and returns the relevant data.
1985
1986 Arguments:
1987 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001988 files: list of file paths to upload. If a directory is specified, a
1989 .isolated file is created and its hash is returned.
1990 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001991
1992 Returns:
1993 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1994 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001995 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001996 assert all(isinstance(i, unicode) for i in files), files
1997 if len(files) != len(set(map(os.path.abspath, files))):
1998 raise Error('Duplicate entries found.')
1999
maruel064c0a32016-04-05 11:47:15 -07002000 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002001 results = []
2002 # The temporary directory is only created as needed.
2003 tempdir = None
2004 try:
2005 # TODO(maruel): Yield the files to a worker thread.
2006 items_to_upload = []
2007 for f in files:
2008 try:
2009 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002010 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002011 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002012 items, metadata = directory_to_metadata(
2013 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002014
2015 # Create the .isolated file.
2016 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002017 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2018 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002019 os.close(handle)
2020 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002021 'algo':
2022 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002023 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002024 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002025 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002026 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002027 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002028 items_to_upload.extend(items)
2029 items_to_upload.append(
2030 FileItem(
2031 path=isolated,
2032 digest=h,
maruel12e30012015-10-09 11:55:35 -07002033 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002034 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002035 results.append((h, f))
2036
maruel12e30012015-10-09 11:55:35 -07002037 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002038 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002039 items_to_upload.append(
2040 FileItem(
2041 path=filepath,
2042 digest=h,
maruel12e30012015-10-09 11:55:35 -07002043 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002044 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002045 results.append((h, f))
2046 else:
2047 raise Error('%s is neither a file or directory.' % f)
2048 except OSError:
2049 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002050 uploaded = storage.upload_items(items_to_upload)
2051 cold = [i for i in items_to_upload if i in uploaded]
2052 hot = [i for i in items_to_upload if i not in uploaded]
2053 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002054 finally:
maruel12e30012015-10-09 11:55:35 -07002055 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002056 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002057
2058
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002059def archive(out, namespace, files, blacklist):
2060 if files == ['-']:
2061 files = sys.stdin.readlines()
2062
2063 if not files:
2064 raise Error('Nothing to upload')
2065
2066 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002067 blacklist = tools.gen_blacklist(blacklist)
2068 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002069 # Ignore stats.
2070 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002071 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2072
2073
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002074@subcommand.usage('<file1..fileN> or - to read from stdin')
2075def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002076 """Archives data to the server.
2077
2078 If a directory is specified, a .isolated file is created the whole directory
2079 is uploaded. Then this .isolated file can be included in another one to run
2080 commands.
2081
2082 The commands output each file that was processed with its content hash. For
2083 directories, the .isolated generated for the directory is listed as the
2084 directory entry itself.
2085 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002086 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002087 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002088 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002089 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002090 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002091 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002092 except Error as e:
2093 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002094 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002095
2096
2097def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002098 """Download data from the server.
2099
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002100 It can either download individual files or a complete tree from a .isolated
2101 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002102 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002103 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002104 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002105 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002106 help='hash of an isolated file, .isolated file content is discarded, use '
2107 '--file if you need it')
2108 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002109 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2110 help='hash and destination of a file, can be used multiple times')
2111 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002112 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002113 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002114 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002115 options, args = parser.parse_args(args)
2116 if args:
2117 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002118
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002119 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002120 if bool(options.isolated) == bool(options.file):
2121 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002122
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002123 cache = process_cache_options(options)
maruel12e30012015-10-09 11:55:35 -07002124 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002125 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002126 if (fs.isfile(options.target) or
2127 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002128 parser.error(
2129 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002130 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002131 # Fetching individual files.
2132 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002133 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002134 channel = threading_utils.TaskChannel()
2135 pending = {}
2136 for digest, dest in options.file:
2137 pending[digest] = dest
2138 storage.async_fetch(
2139 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002140 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002141 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002142 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002143 functools.partial(file_write, os.path.join(options.target, dest)))
2144 while pending:
2145 fetched = channel.pull()
2146 dest = pending.pop(fetched)
2147 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002148
Vadim Shtayura3172be52013-12-03 12:49:05 -08002149 # Fetching whole isolated tree.
2150 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002151 with cache:
2152 bundle = fetch_isolated(
2153 isolated_hash=options.isolated,
2154 storage=storage,
2155 cache=cache,
maruelb8d88d12016-04-08 12:54:01 -07002156 outdir=options.target)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002157 if bundle.command:
2158 rel = os.path.join(options.target, bundle.relative_cwd)
2159 print('To run this test please run from the directory %s:' %
2160 os.path.join(options.target, rel))
2161 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002162
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002163 return 0
2164
2165
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002166def add_archive_options(parser):
2167 parser.add_option(
2168 '--blacklist',
2169 action='append', default=list(DEFAULT_BLACKLIST),
2170 help='List of regexp to use as blacklist filter when uploading '
2171 'directories')
2172
2173
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002174def add_isolate_server_options(parser):
2175 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002176 parser.add_option(
2177 '-I', '--isolate-server',
2178 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002179 help='URL of the Isolate Server to use. Defaults to the environment '
2180 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2181 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002182 parser.add_option(
2183 '--namespace', default='default-gzip',
2184 help='The namespace to use on the Isolate Server, default: %default')
2185
2186
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002187def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002188 """Processes the --isolate-server option and aborts if not specified.
2189
2190 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002191 """
2192 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002193 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002194 try:
2195 options.isolate_server = net.fix_url(options.isolate_server)
2196 except ValueError as e:
2197 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002198 if set_exception_handler:
2199 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002200 try:
2201 return auth.ensure_logged_in(options.isolate_server)
2202 except ValueError as e:
2203 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002204
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002205
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002206def add_cache_options(parser):
2207 cache_group = optparse.OptionGroup(parser, 'Cache management')
2208 cache_group.add_option(
2209 '--cache', metavar='DIR',
2210 help='Directory to keep a local cache of the files. Accelerates download '
2211 'by reusing already downloaded files. Default=%default')
2212 cache_group.add_option(
2213 '--max-cache-size',
2214 type='int',
2215 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002216 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002217 help='Trim if the cache gets larger than this value, default=%default')
2218 cache_group.add_option(
2219 '--min-free-space',
2220 type='int',
2221 metavar='NNN',
2222 default=2*1024*1024*1024,
2223 help='Trim if disk free space becomes lower than this value, '
2224 'default=%default')
2225 cache_group.add_option(
2226 '--max-items',
2227 type='int',
2228 metavar='NNN',
2229 default=100000,
2230 help='Trim if more than this number of items are in the cache '
2231 'default=%default')
2232 parser.add_option_group(cache_group)
2233
2234
2235def process_cache_options(options):
2236 if options.cache:
2237 policies = CachePolicies(
2238 options.max_cache_size, options.min_free_space, options.max_items)
2239
2240 # |options.cache| path may not exist until DiskCache() instance is created.
2241 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002242 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002243 policies,
2244 isolated_format.get_hash_algo(options.namespace))
2245 else:
2246 return MemoryCache()
2247
2248
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002249class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002250 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002251 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002252 self,
2253 version=__version__,
2254 prog=os.path.basename(sys.modules[__name__].__file__),
2255 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002256 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002257
2258 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002259 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002260 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002261 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002262 return options, args
2263
2264
2265def main(args):
2266 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002267 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002268
2269
2270if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002271 fix_encoding.fix_encoding()
2272 tools.disable_buffering()
2273 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002274 sys.exit(main(sys.argv[1:]))