blob: 7a861eabc5c583608a9c612912a176f702c5326a [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
3# Use of this source code is governed by the Apache v2.0 license that can be
4# found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
nodir55be77b2016-05-03 09:39:57 -07008__version__ = '0.4.8'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# The file size to be used when we don't know the correct file size,
48# generally used for .isolated files.
49UNKNOWN_FILE_SIZE = None
50
51
52# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000080# Chunk size to use when reading from network stream.
81NET_IO_FILE_CHUNK = 16 * 1024
82
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000083
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000084# Read timeout in seconds for downloads from isolate storage. If there's no
85# response from the server within this timeout whole download will be aborted.
86DOWNLOAD_READ_TIMEOUT = 60
87
88
maruel@chromium.org41601642013-09-18 19:40:46 +000089# The delay (in seconds) to wait between logging statements when retrieving
90# the required files. This is intended to let the user (or buildbot) know that
91# the program is still running.
92DELAY_BETWEEN_UPDATES_IN_SECS = 30
93
94
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050095DEFAULT_BLACKLIST = (
96 # Temporary vim or python files.
97 r'^.+\.(?:pyc|swp)$',
98 # .git or .svn directory.
99 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
100)
101
102
Vadim Shtayura8623c272014-12-01 11:45:27 -0800103# A class to use to communicate with the server by default. Can be changed by
104# 'set_storage_api_class'. Default is IsolateServer.
105_storage_api_cls = None
106
107
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500108class Error(Exception):
109 """Generic runtime error."""
110 pass
111
112
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400113class Aborted(Error):
114 """Operation aborted."""
115 pass
116
117
maruel12e30012015-10-09 11:55:35 -0700118def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800119 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700120 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800121 if offset:
122 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000123 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000124 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 if not data:
126 break
127 yield data
128
129
maruel12e30012015-10-09 11:55:35 -0700130def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000131 """Writes file content as generated by content_generator.
132
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000133 Creates the intermediary directory as needed.
134
135 Returns the number of bytes written.
136
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000137 Meant to be mocked out in unit tests.
138 """
nodire5028a92016-04-29 14:38:21 -0700139 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000140 total = 0
maruel12e30012015-10-09 11:55:35 -0700141 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000142 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000143 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000144 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000145 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146
147
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000148def zip_compress(content_generator, level=7):
149 """Reads chunks from |content_generator| and yields zip compressed chunks."""
150 compressor = zlib.compressobj(level)
151 for chunk in content_generator:
152 compressed = compressor.compress(chunk)
153 if compressed:
154 yield compressed
155 tail = compressor.flush(zlib.Z_FINISH)
156 if tail:
157 yield tail
158
159
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400160def zip_decompress(
161 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000162 """Reads zipped data from |content_generator| and yields decompressed data.
163
164 Decompresses data in small chunks (no larger than |chunk_size|) so that
165 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
166
167 Raises IOError if data is corrupted or incomplete.
168 """
169 decompressor = zlib.decompressobj()
170 compressed_size = 0
171 try:
172 for chunk in content_generator:
173 compressed_size += len(chunk)
174 data = decompressor.decompress(chunk, chunk_size)
175 if data:
176 yield data
177 while decompressor.unconsumed_tail:
178 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
179 if data:
180 yield data
181 tail = decompressor.flush()
182 if tail:
183 yield tail
184 except zlib.error as e:
185 raise IOError(
186 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
187 # Ensure all data was read and decompressed.
188 if decompressor.unused_data or decompressor.unconsumed_tail:
189 raise IOError('Not all data was decompressed')
190
191
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000192def get_zip_compression_level(filename):
193 """Given a filename calculates the ideal zip compression level to use."""
194 file_ext = os.path.splitext(filename)[1].lower()
195 # TODO(csharp): Profile to find what compression level works best.
196 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
197
198
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000199def create_directories(base_directory, files):
200 """Creates the directory structure needed by the given list of files."""
201 logging.debug('create_directories(%s, %d)', base_directory, len(files))
202 # Creates the tree of directories to create.
203 directories = set(os.path.dirname(f) for f in files)
204 for item in list(directories):
205 while item:
206 directories.add(item)
207 item = os.path.dirname(item)
208 for d in sorted(directories):
209 if d:
maruel12e30012015-10-09 11:55:35 -0700210 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000211
212
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500213def create_symlinks(base_directory, files):
214 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000215 for filepath, properties in files:
216 if 'l' not in properties:
217 continue
218 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500219 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000220 logging.warning('Ignoring symlink %s', filepath)
221 continue
222 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500223 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000224 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000225
226
maruel12e30012015-10-09 11:55:35 -0700227def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000228 """Determines if the given files appears valid.
229
230 Currently it just checks the file's size.
231 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700232 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700233 return fs.isfile(path)
234 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000235 if size != actual_size:
236 logging.warning(
237 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700238 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000239 return False
240 return True
241
242
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000243class Item(object):
244 """An item to push to Storage.
245
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800246 Its digest and size may be provided in advance, if known. Otherwise they will
247 be derived from content(). If digest is provided, it MUST correspond to
248 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000249
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800250 When used with Storage, Item starts its life in a main thread, travels
251 to 'contains' thread, then to 'push' thread and then finally back to
252 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000253 """
254
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800255 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000256 self.digest = digest
257 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800258 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000259 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000260
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800261 def content(self):
262 """Iterable with content of this item as byte string (str) chunks."""
263 raise NotImplementedError()
264
265 def prepare(self, hash_algo):
266 """Ensures self.digest and self.size are set.
267
268 Uses content() as a source of data to calculate them. Does nothing if digest
269 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000270
271 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800272 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000273 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800274 if self.digest is None or self.size is None:
275 digest = hash_algo()
276 total = 0
277 for chunk in self.content():
278 digest.update(chunk)
279 total += len(chunk)
280 self.digest = digest.hexdigest()
281 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000282
283
284class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800285 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000286
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800287 Its digest and size may be provided in advance, if known. Otherwise they will
288 be derived from the file content.
289 """
290
291 def __init__(self, path, digest=None, size=None, high_priority=False):
292 super(FileItem, self).__init__(
293 digest,
maruel12e30012015-10-09 11:55:35 -0700294 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800295 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000296 self.path = path
297 self.compression_level = get_zip_compression_level(path)
298
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800299 def content(self):
300 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000301
302
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000303class BufferItem(Item):
304 """A byte buffer to push to Storage."""
305
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800306 def __init__(self, buf, high_priority=False):
307 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000308 self.buffer = buf
309
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800310 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000311 return [self.buffer]
312
313
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000314class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800315 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000316
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800317 Implements compression support, parallel 'contains' checks, parallel uploads
318 and more.
319
320 Works only within single namespace (and thus hashing algorithm and compression
321 scheme are fixed).
322
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400323 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
324 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800325 """
326
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700327 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000328 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400329 self._use_zip = isolated_format.is_namespace_with_compression(
330 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400331 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000332 self._cpu_thread_pool = None
333 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400334 self._aborted = False
335 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000336
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000337 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700338 def hash_algo(self):
339 """Hashing algorithm used to name files in storage based on their content.
340
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400341 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700342 """
343 return self._hash_algo
344
345 @property
346 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500347 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700348 return self._storage_api.location
349
350 @property
351 def namespace(self):
352 """Isolate namespace used by this storage.
353
354 Indirectly defines hashing scheme and compression method used.
355 """
356 return self._storage_api.namespace
357
358 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000359 def cpu_thread_pool(self):
360 """ThreadPool for CPU-bound tasks like zipping."""
361 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500362 threads = max(threading_utils.num_processors(), 2)
363 if sys.maxsize <= 2L**32:
364 # On 32 bits userland, do not try to use more than 16 threads.
365 threads = min(threads, 16)
366 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000368
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000369 @property
370 def net_thread_pool(self):
371 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
372 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700373 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000374 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000375
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000376 def close(self):
377 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400378 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000379 if self._cpu_thread_pool:
380 self._cpu_thread_pool.join()
381 self._cpu_thread_pool.close()
382 self._cpu_thread_pool = None
383 if self._net_thread_pool:
384 self._net_thread_pool.join()
385 self._net_thread_pool.close()
386 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400387 logging.info('Done.')
388
389 def abort(self):
390 """Cancels any pending or future operations."""
391 # This is not strictly theadsafe, but in the worst case the logging message
392 # will be printed twice. Not a big deal. In other places it is assumed that
393 # unprotected reads and writes to _aborted are serializable (it is true
394 # for python) and thus no locking is used.
395 if not self._aborted:
396 logging.warning('Aborting... It can take a while.')
397 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000398
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000399 def __enter__(self):
400 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400401 assert not self._prev_sig_handlers, self._prev_sig_handlers
402 for s in (signal.SIGINT, signal.SIGTERM):
403 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000404 return self
405
406 def __exit__(self, _exc_type, _exc_value, _traceback):
407 """Context manager interface."""
408 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400409 while self._prev_sig_handlers:
410 s, h = self._prev_sig_handlers.popitem()
411 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 return False
413
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000414 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800415 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000416
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800417 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000418
419 Arguments:
420 items: list of Item instances that represents data to upload.
421
422 Returns:
423 List of items that were uploaded. All other items are already there.
424 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700425 logging.info('upload_items(items=%d)', len(items))
426
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800427 # Ensure all digests are calculated.
428 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700429 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800430
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000431 # For each digest keep only first Item that matches it. All other items
432 # are just indistinguishable copies from the point of view of isolate
433 # server (it doesn't care about paths at all, only content and digests).
434 seen = {}
435 duplicates = 0
436 for item in items:
437 if seen.setdefault(item.digest, item) is not item:
438 duplicates += 1
439 items = seen.values()
440 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700441 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000442
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000443 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000444 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000445 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800446 channel = threading_utils.TaskChannel()
447 for missing_item, push_state in self.get_missing_items(items):
448 missing.add(missing_item)
449 self.async_push(channel, missing_item, push_state)
450
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 # No need to spawn deadlock detector thread if there's nothing to upload.
452 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700453 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000454 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000455 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000456 detector.ping()
457 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000458 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000459 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000460 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000461 logging.info('All files are uploaded')
462
463 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000464 total = len(items)
465 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000466 logging.info(
467 'Total: %6d, %9.1fkb',
468 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000469 total_size / 1024.)
470 cache_hit = set(items) - missing
471 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472 logging.info(
473 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
474 len(cache_hit),
475 cache_hit_size / 1024.,
476 len(cache_hit) * 100. / total,
477 cache_hit_size * 100. / total_size if total_size else 0)
478 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000479 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000480 logging.info(
481 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
482 len(cache_miss),
483 cache_miss_size / 1024.,
484 len(cache_miss) * 100. / total,
485 cache_miss_size * 100. / total_size if total_size else 0)
486
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000487 return uploaded
488
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800489 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000490 """Starts asynchronous push to the server in a parallel thread.
491
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800492 Can be used only after |item| was checked for presence on a server with
493 'get_missing_items' call. 'get_missing_items' returns |push_state| object
494 that contains storage specific information describing how to upload
495 the item (for example in case of cloud storage, it is signed upload URLs).
496
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000497 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000498 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000499 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 push_state: push state returned by 'get_missing_items' call for |item|.
501
502 Returns:
503 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000504 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800505 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400506 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700507 threading_utils.PRIORITY_HIGH if item.high_priority
508 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000510 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400511 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400512 if self._aborted:
513 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700514 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000516 return item
517
518 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700519 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800520 self.net_thread_pool.add_task_with_channel(
521 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000522 return
523
524 # If zipping is enabled, zip in a separate thread.
525 def zip_and_push():
526 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
527 # content right here. It will block until all file is zipped.
528 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400529 if self._aborted:
530 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 data = ''.join(stream)
533 except Exception as exc:
534 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800535 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000536 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000537 self.net_thread_pool.add_task_with_channel(
538 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000539 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000540
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800541 def push(self, item, push_state):
542 """Synchronously pushes a single item to the server.
543
544 If you need to push many items at once, consider using 'upload_items' or
545 'async_push' with instance of TaskChannel.
546
547 Arguments:
548 item: item to upload as instance of Item class.
549 push_state: push state returned by 'get_missing_items' call for |item|.
550
551 Returns:
552 Pushed item (same object as |item|).
553 """
554 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700555 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800556 self.async_push(channel, item, push_state)
557 pushed = channel.pull()
558 assert pushed is item
559 return item
560
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000561 def async_fetch(self, channel, priority, digest, size, sink):
562 """Starts asynchronous fetch from the server in a parallel thread.
563
564 Arguments:
565 channel: TaskChannel that receives back |digest| when download ends.
566 priority: thread pool task priority for the fetch.
567 digest: hex digest of an item to download.
568 size: expected size of the item (after decompression).
569 sink: function that will be called as sink(generator).
570 """
571 def fetch():
572 try:
573 # Prepare reading pipeline.
574 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700575 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400576 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000577 # Run |stream| through verifier that will assert its size.
578 verifier = FetchStreamVerifier(stream, size)
579 # Verified stream goes to |sink|.
580 sink(verifier.run())
581 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800582 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000583 raise
584 return digest
585
586 # Don't bother with zip_thread_pool for decompression. Decompression is
587 # really fast and most probably IO bound anyway.
588 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
589
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000590 def get_missing_items(self, items):
591 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000592
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000593 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000594
595 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000596 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000597
598 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800599 For each missing item it yields a pair (item, push_state), where:
600 * item - Item object that is missing (one of |items|).
601 * push_state - opaque object that contains storage specific information
602 describing how to upload the item (for example in case of cloud
603 storage, it is signed upload URLs). It can later be passed to
604 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000605 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000606 channel = threading_utils.TaskChannel()
607 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800608
609 # Ensure all digests are calculated.
610 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700611 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800612
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400613 def contains(batch):
614 if self._aborted:
615 raise Aborted()
616 return self._storage_api.contains(batch)
617
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000618 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800619 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400620 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400621 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000622 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800623
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000624 # Yield results as they come in.
625 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800626 for missing_item, push_state in channel.pull().iteritems():
627 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000628
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000629
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800630def batch_items_for_check(items):
631 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000632
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800633 Each batch corresponds to a single 'exists?' query to the server via a call
634 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000635
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636 Arguments:
637 items: a list of Item objects.
638
639 Yields:
640 Batches of items to query for existence in a single operation,
641 each batch is a list of Item objects.
642 """
643 batch_count = 0
644 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
645 next_queries = []
646 for item in sorted(items, key=lambda x: x.size, reverse=True):
647 next_queries.append(item)
648 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000649 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800650 next_queries = []
651 batch_count += 1
652 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
653 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
654 if next_queries:
655 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000656
657
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000658class FetchQueue(object):
659 """Fetches items from Storage and places them into LocalCache.
660
661 It manages multiple concurrent fetch operations. Acts as a bridge between
662 Storage and LocalCache so that Storage and LocalCache don't depend on each
663 other at all.
664 """
665
666 def __init__(self, storage, cache):
667 self.storage = storage
668 self.cache = cache
669 self._channel = threading_utils.TaskChannel()
670 self._pending = set()
671 self._accessed = set()
672 self._fetched = cache.cached_set()
673
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400674 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700675 self,
676 digest,
677 size=UNKNOWN_FILE_SIZE,
678 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000679 """Starts asynchronous fetch of item |digest|."""
680 # Fetching it now?
681 if digest in self._pending:
682 return
683
684 # Mark this file as in use, verify_all_cached will later ensure it is still
685 # in cache.
686 self._accessed.add(digest)
687
688 # Already fetched? Notify cache to update item's LRU position.
689 if digest in self._fetched:
690 # 'touch' returns True if item is in cache and not corrupted.
691 if self.cache.touch(digest, size):
692 return
693 # Item is corrupted, remove it from cache and fetch it again.
694 self._fetched.remove(digest)
695 self.cache.evict(digest)
696
697 # TODO(maruel): It should look at the free disk space, the current cache
698 # size and the size of the new item on every new item:
699 # - Trim the cache as more entries are listed when free disk space is low,
700 # otherwise if the amount of data downloaded during the run > free disk
701 # space, it'll crash.
702 # - Make sure there's enough free disk space to fit all dependencies of
703 # this run! If not, abort early.
704
705 # Start fetching.
706 self._pending.add(digest)
707 self.storage.async_fetch(
708 self._channel, priority, digest, size,
709 functools.partial(self.cache.write, digest))
710
711 def wait(self, digests):
712 """Starts a loop that waits for at least one of |digests| to be retrieved.
713
714 Returns the first digest retrieved.
715 """
716 # Flush any already fetched items.
717 for digest in digests:
718 if digest in self._fetched:
719 return digest
720
721 # Ensure all requested items are being fetched now.
722 assert all(digest in self._pending for digest in digests), (
723 digests, self._pending)
724
725 # Wait for some requested item to finish fetching.
726 while self._pending:
727 digest = self._channel.pull()
728 self._pending.remove(digest)
729 self._fetched.add(digest)
730 if digest in digests:
731 return digest
732
733 # Should never reach this point due to assert above.
734 raise RuntimeError('Impossible state')
735
736 def inject_local_file(self, path, algo):
737 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700738 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000739 data = f.read()
740 digest = algo(data).hexdigest()
741 self.cache.write(digest, [data])
742 self._fetched.add(digest)
743 return digest
744
745 @property
746 def pending_count(self):
747 """Returns number of items to be fetched."""
748 return len(self._pending)
749
750 def verify_all_cached(self):
751 """True if all accessed items are in cache."""
752 return self._accessed.issubset(self.cache.cached_set())
753
754
755class FetchStreamVerifier(object):
756 """Verifies that fetched file is valid before passing it to the LocalCache."""
757
758 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400759 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000760 self.stream = stream
761 self.expected_size = expected_size
762 self.current_size = 0
763
764 def run(self):
765 """Generator that yields same items as |stream|.
766
767 Verifies |stream| is complete before yielding a last chunk to consumer.
768
769 Also wraps IOError produced by consumer into MappingError exceptions since
770 otherwise Storage will retry fetch on unrelated local cache errors.
771 """
772 # Read one chunk ahead, keep it in |stored|.
773 # That way a complete stream can be verified before pushing last chunk
774 # to consumer.
775 stored = None
776 for chunk in self.stream:
777 assert chunk is not None
778 if stored is not None:
779 self._inspect_chunk(stored, is_last=False)
780 try:
781 yield stored
782 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400783 raise isolated_format.MappingError(
784 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000785 stored = chunk
786 if stored is not None:
787 self._inspect_chunk(stored, is_last=True)
788 try:
789 yield stored
790 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400791 raise isolated_format.MappingError(
792 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000793
794 def _inspect_chunk(self, chunk, is_last):
795 """Called for each fetched chunk before passing it to consumer."""
796 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400797 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700798 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000799 (self.expected_size != self.current_size)):
800 raise IOError('Incorrect file size: expected %d, got %d' % (
801 self.expected_size, self.current_size))
802
803
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000804class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800805 """Interface for classes that implement low-level storage operations.
806
807 StorageApi is oblivious of compression and hashing scheme used. This details
808 are handled in higher level Storage class.
809
810 Clients should generally not use StorageApi directly. Storage class is
811 preferred since it implements compression and upload optimizations.
812 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000813
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700814 @property
815 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500816 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700817 raise NotImplementedError()
818
819 @property
820 def namespace(self):
821 """Isolate namespace used by this storage.
822
823 Indirectly defines hashing scheme and compression method used.
824 """
825 raise NotImplementedError()
826
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800827 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000828 """Fetches an object and yields its content.
829
830 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000831 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800832 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000833
834 Yields:
835 Chunks of downloaded item (as str objects).
836 """
837 raise NotImplementedError()
838
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800839 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000840 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000841
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800842 |item| MUST go through 'contains' call to get |push_state| before it can
843 be pushed to the storage.
844
845 To be clear, here is one possible usage:
846 all_items = [... all items to push as Item subclasses ...]
847 for missing_item, push_state in storage_api.contains(all_items).items():
848 storage_api.push(missing_item, push_state)
849
850 When pushing to a namespace with compression, data that should be pushed
851 and data provided by the item is not the same. In that case |content| is
852 not None and it yields chunks of compressed data (using item.content() as
853 a source of original uncompressed data). This is implemented by Storage
854 class.
855
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000856 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000857 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800858 push_state: push state object as returned by 'contains' call.
859 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000860
861 Returns:
862 None.
863 """
864 raise NotImplementedError()
865
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000866 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800867 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000868
869 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800870 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000871
872 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800873 A dict missing Item -> opaque push state object to be passed to 'push'.
874 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000875 """
876 raise NotImplementedError()
877
878
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800879class _IsolateServerPushState(object):
880 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500881
882 Note this needs to be a global class to support pickling.
883 """
884
Cory Massarocc19c8c2015-03-10 13:35:11 -0700885 def __init__(self, preupload_status, size):
886 self.preupload_status = preupload_status
887 gs_upload_url = preupload_status.get('gs_upload_url') or None
888 if gs_upload_url:
889 self.upload_url = gs_upload_url
890 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
891 else:
892 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
893 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500894 self.uploaded = False
895 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500896 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500897
898
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000899class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000900 """StorageApi implementation that downloads and uploads to Isolate Server.
901
902 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800903 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000904 """
905
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000906 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000907 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500908 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700909 self._base_url = base_url.rstrip('/')
910 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700911 self._namespace_dict = {
912 'compression': 'flate' if namespace.endswith(
913 ('-gzip', '-flate')) else '',
914 'digest_hash': 'sha-1',
915 'namespace': namespace,
916 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000917 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000918 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500919 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000920
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000921 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000922 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700923 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000924
925 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700926 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000927 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000928 # TODO(maruel): Make this request much earlier asynchronously while the
929 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800930
931 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
932 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700933
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000934 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000935 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700936 self._server_caps = net.url_read_json(
937 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
938 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000939 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000940
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700941 @property
942 def location(self):
943 return self._base_url
944
945 @property
946 def namespace(self):
947 return self._namespace
948
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800949 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700950 assert offset >= 0
951 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
952 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800953 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700954 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000955
Cory Massarocc19c8c2015-03-10 13:35:11 -0700956 if not response:
maruele154f9c2015-09-14 11:03:15 -0700957 raise IOError(
958 'Attempted to fetch from %s; no data exist: %s / %s.' % (
959 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800960
Cory Massarocc19c8c2015-03-10 13:35:11 -0700961 # for DB uploads
962 content = response.get('content')
963 if content is not None:
maruel863ac262016-03-17 11:00:37 -0700964 yield base64.b64decode(content)
965 return
Cory Massarocc19c8c2015-03-10 13:35:11 -0700966
967 # for GS entities
968 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -0700969 if not connection:
970 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -0700971
972 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800973 if offset:
974 content_range = connection.get_header('Content-Range')
975 if not content_range:
976 raise IOError('Missing Content-Range header')
977
978 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
979 # According to a spec, <size> can be '*' meaning "Total size of the file
980 # is not known in advance".
981 try:
982 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
983 if not match:
984 raise ValueError()
985 content_offset = int(match.group(1))
986 last_byte_index = int(match.group(2))
987 size = None if match.group(3) == '*' else int(match.group(3))
988 except ValueError:
989 raise IOError('Invalid Content-Range header: %s' % content_range)
990
991 # Ensure returned offset equals requested one.
992 if offset != content_offset:
993 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
994 offset, content_offset, content_range))
995
996 # Ensure entire tail of the file is returned.
997 if size is not None and last_byte_index + 1 != size:
998 raise IOError('Incomplete response. Content-Range: %s' % content_range)
999
maruel863ac262016-03-17 11:00:37 -07001000 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1001 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001002
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001003 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001004 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001005 assert item.digest is not None
1006 assert item.size is not None
1007 assert isinstance(push_state, _IsolateServerPushState)
1008 assert not push_state.finalized
1009
1010 # Default to item.content().
1011 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001012 logging.info('Push state size: %d', push_state.size)
1013 if isinstance(content, (basestring, list)):
1014 # Memory is already used, too late.
1015 with self._lock:
1016 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001017 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001018 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1019 # If |content| is indeed a generator, it can not be re-winded back to the
1020 # beginning of the stream. A retry will find it exhausted. A possible
1021 # solution is to wrap |content| generator with some sort of caching
1022 # restartable generator. It should be done alongside streaming support
1023 # implementation.
1024 #
1025 # In theory, we should keep the generator, so that it is not serialized in
1026 # memory. Sadly net.HttpService.request() requires the body to be
1027 # serialized.
1028 assert isinstance(content, types.GeneratorType), repr(content)
1029 slept = False
1030 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001031 # One byte less than 512mb. This is to cope with incompressible content.
1032 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001033 while True:
1034 with self._lock:
1035 # This is due to 32 bits python when uploading very large files. The
1036 # problem is that it's comparing uncompressed sizes, while we care
1037 # about compressed sizes since it's what is serialized in memory.
1038 # The first check assumes large files are compressible and that by
1039 # throttling one upload at once, we can survive. Otherwise, kaboom.
1040 memory_use = self._memory_use
1041 if ((push_state.size >= max_size and not memory_use) or
1042 (memory_use + push_state.size <= max_size)):
1043 self._memory_use += push_state.size
1044 memory_use = self._memory_use
1045 break
1046 time.sleep(0.1)
1047 slept = True
1048 if slept:
1049 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001050
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001051 try:
1052 # This push operation may be a retry after failed finalization call below,
1053 # no need to reupload contents in that case.
1054 if not push_state.uploaded:
1055 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001056 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001057 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001058 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001059 item.digest, push_state.upload_url))
1060 push_state.uploaded = True
1061 else:
1062 logging.info(
1063 'A file %s already uploaded, retrying finalization only',
1064 item.digest)
1065
1066 # Optionally notify the server that it's done.
1067 if push_state.finalize_url:
1068 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1069 # send it to isolated server. That way isolate server can verify that
1070 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1071 # stored files).
1072 # TODO(maruel): Fix the server to accept properly data={} so
1073 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001074 response = net.url_read_json(
1075 url='%s/%s' % (self._base_url, push_state.finalize_url),
1076 data={
1077 'upload_ticket': push_state.preupload_status['upload_ticket'],
1078 })
1079 if not response or not response['ok']:
1080 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001081 push_state.finalized = True
1082 finally:
1083 with self._lock:
1084 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001085
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001086 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001087 # Ensure all items were initialized with 'prepare' call. Storage does that.
1088 assert all(i.digest is not None and i.size is not None for i in items)
1089
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001090 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001091 body = {
1092 'items': [
1093 {
1094 'digest': item.digest,
1095 'is_isolated': bool(item.high_priority),
1096 'size': item.size,
1097 } for item in items
1098 ],
1099 'namespace': self._namespace_dict,
1100 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001101
Cory Massarocc19c8c2015-03-10 13:35:11 -07001102 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001103
1104 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001105 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001106 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001107 response = net.url_read_json(url=query_url, data=body)
1108 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001109 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001110 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001111 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001112 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001113 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001114
1115 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001116 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001117 for preupload_status in response.get('items', []):
1118 assert 'upload_ticket' in preupload_status, (
1119 preupload_status, '/preupload did not generate an upload ticket')
1120 index = int(preupload_status['index'])
1121 missing_items[items[index]] = _IsolateServerPushState(
1122 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001123 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001124 len(items), len(items) - len(missing_items))
1125 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001126
Cory Massarocc19c8c2015-03-10 13:35:11 -07001127 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001128 """Fetches isolated data from the URL.
1129
1130 Used only for fetching files, not for API calls. Can be overridden in
1131 subclasses.
1132
1133 Args:
1134 url: URL to fetch the data from, can possibly return http redirect.
1135 offset: byte offset inside the file to start fetching from.
1136
1137 Returns:
1138 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1139 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001140 assert isinstance(offset, int)
1141 data = {
1142 'digest': digest.encode('utf-8'),
1143 'namespace': self._namespace_dict,
1144 'offset': offset,
1145 }
maruel0c25f4f2015-12-15 05:41:17 -08001146 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1147 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001148 return net.url_read_json(
1149 url=url,
1150 data=data,
1151 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001152
Cory Massarocc19c8c2015-03-10 13:35:11 -07001153 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001154 """Uploads isolated file to the URL.
1155
1156 Used only for storing files, not for API calls. Can be overridden in
1157 subclasses.
1158
1159 Args:
1160 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001161 push_state: an _IsolateServicePushState instance
1162 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001163 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001164 """
1165 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1166 # upload support is implemented.
1167 if isinstance(content, list) and len(content) == 1:
1168 content = content[0]
1169 else:
1170 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001171
1172 # DB upload
1173 if not push_state.finalize_url:
1174 url = '%s/%s' % (self._base_url, push_state.upload_url)
1175 content = base64.b64encode(content)
1176 data = {
1177 'upload_ticket': push_state.preupload_status['upload_ticket'],
1178 'content': content,
1179 }
1180 response = net.url_read_json(url=url, data=data)
1181 return response is not None and response['ok']
1182
1183 # upload to GS
1184 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001185 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001186 content_type='application/octet-stream',
1187 data=content,
1188 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001189 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001190 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001191 return response is not None
1192
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001193
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001194class LocalCache(object):
1195 """Local cache that stores objects fetched via Storage.
1196
1197 It can be accessed concurrently from multiple threads, so it should protect
1198 its internal state with some lock.
1199 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001200 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001201
maruel064c0a32016-04-05 11:47:15 -07001202 def __init__(self):
1203 self._lock = threading_utils.LockWithAssert()
1204 # Profiling values.
1205 self._added = []
1206 self._initial_number_items = 0
1207 self._initial_size = 0
1208 self._evicted = []
1209 self._linked = []
1210
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001211 def __enter__(self):
1212 """Context manager interface."""
1213 return self
1214
1215 def __exit__(self, _exc_type, _exec_value, _traceback):
1216 """Context manager interface."""
1217 return False
1218
maruel064c0a32016-04-05 11:47:15 -07001219 @property
1220 def added(self):
1221 return self._added[:]
1222
1223 @property
1224 def evicted(self):
1225 return self._evicted[:]
1226
1227 @property
1228 def initial_number_items(self):
1229 return self._initial_number_items
1230
1231 @property
1232 def initial_size(self):
1233 return self._initial_size
1234
1235 @property
1236 def linked(self):
1237 return self._linked[:]
1238
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001239 def cached_set(self):
1240 """Returns a set of all cached digests (always a new object)."""
1241 raise NotImplementedError()
1242
maruel36a963d2016-04-08 17:15:49 -07001243 def cleanup(self):
1244 """Deletes any corrupted item from the cache and trims it if necessary."""
1245 raise NotImplementedError()
1246
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001247 def touch(self, digest, size):
1248 """Ensures item is not corrupted and updates its LRU position.
1249
1250 Arguments:
1251 digest: hash digest of item to check.
1252 size: expected size of this item.
1253
1254 Returns:
1255 True if item is in cache and not corrupted.
1256 """
1257 raise NotImplementedError()
1258
1259 def evict(self, digest):
1260 """Removes item from cache if it's there."""
1261 raise NotImplementedError()
1262
1263 def read(self, digest):
1264 """Returns contents of the cached item as a single str."""
1265 raise NotImplementedError()
1266
1267 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001268 """Reads data from |content| generator and stores it in cache.
1269
1270 Returns digest to simplify chaining.
1271 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001272 raise NotImplementedError()
1273
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001274 def hardlink(self, digest, dest, file_mode):
1275 """Ensures file at |dest| has same content as cached |digest|.
1276
1277 If file_mode is provided, it is used to set the executable bit if
1278 applicable.
1279 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001280 raise NotImplementedError()
1281
1282
1283class MemoryCache(LocalCache):
1284 """LocalCache implementation that stores everything in memory."""
1285
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001286 def __init__(self, file_mode_mask=0500):
1287 """Args:
1288 file_mode_mask: bit mask to AND file mode with. Default value will make
1289 all mapped files to be read only.
1290 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001291 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001292 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001293 self._contents = {}
1294
1295 def cached_set(self):
1296 with self._lock:
1297 return set(self._contents)
1298
maruel36a963d2016-04-08 17:15:49 -07001299 def cleanup(self):
1300 pass
1301
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001302 def touch(self, digest, size):
1303 with self._lock:
1304 return digest in self._contents
1305
1306 def evict(self, digest):
1307 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001308 v = self._contents.pop(digest, None)
1309 if v is not None:
1310 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001311
1312 def read(self, digest):
1313 with self._lock:
1314 return self._contents[digest]
1315
1316 def write(self, digest, content):
1317 # Assemble whole stream before taking the lock.
1318 data = ''.join(content)
1319 with self._lock:
1320 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001321 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001322 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001323
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001324 def hardlink(self, digest, dest, file_mode):
1325 """Since data is kept in memory, there is no filenode to hardlink."""
maruel064c0a32016-04-05 11:47:15 -07001326 data = self.read(digest)
1327 file_write(dest, [data])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001328 if file_mode is not None:
maruel12e30012015-10-09 11:55:35 -07001329 fs.chmod(dest, file_mode & self._file_mode_mask)
maruel064c0a32016-04-05 11:47:15 -07001330 with self._lock:
1331 self._linked.append(len(data))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001332
1333
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001334class CachePolicies(object):
1335 def __init__(self, max_cache_size, min_free_space, max_items):
1336 """
1337 Arguments:
1338 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1339 cache is effectively a leak.
1340 - min_free_space: Trim if disk free space becomes lower than this value. If
1341 0, it unconditionally fill the disk.
1342 - max_items: Maximum number of items to keep in the cache. If 0, do not
1343 enforce a limit.
1344 """
1345 self.max_cache_size = max_cache_size
1346 self.min_free_space = min_free_space
1347 self.max_items = max_items
1348
1349
1350class DiskCache(LocalCache):
1351 """Stateful LRU cache in a flat hash table in a directory.
1352
1353 Saves its state as json file.
1354 """
maruel12e30012015-10-09 11:55:35 -07001355 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001356
1357 def __init__(self, cache_dir, policies, hash_algo):
1358 """
1359 Arguments:
1360 cache_dir: directory where to place the cache.
1361 policies: cache retention policies.
1362 algo: hashing algorithm used.
1363 """
maruel064c0a32016-04-05 11:47:15 -07001364 # All protected methods (starting with '_') except _path should be called
1365 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001366 super(DiskCache, self).__init__()
1367 self.cache_dir = cache_dir
1368 self.policies = policies
1369 self.hash_algo = hash_algo
1370 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001371 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001372 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001373 # Current cached free disk space. It is updated by self._trim().
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001374 self._free_disk = 0
maruel083fa552016-04-08 14:38:01 -07001375 # The items that must not be evicted during this run since they were
1376 # referenced.
1377 self._protected = set()
maruel36a963d2016-04-08 17:15:49 -07001378 # Cleanup operations done by self._load(), if any.
1379 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001380 with tools.Profiler('Setup'):
1381 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001382 # self._load() calls self._trim() which initializes self._free_disk.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001383 self._load()
1384
1385 def __enter__(self):
1386 return self
1387
1388 def __exit__(self, _exc_type, _exec_value, _traceback):
1389 with tools.Profiler('CleanupTrimming'):
1390 with self._lock:
1391 self._trim()
1392
1393 logging.info(
1394 '%5d (%8dkb) added',
1395 len(self._added), sum(self._added) / 1024)
1396 logging.info(
1397 '%5d (%8dkb) current',
1398 len(self._lru),
1399 sum(self._lru.itervalues()) / 1024)
1400 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001401 '%5d (%8dkb) evicted',
1402 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001403 logging.info(
1404 ' %8dkb free',
1405 self._free_disk / 1024)
1406 return False
1407
1408 def cached_set(self):
1409 with self._lock:
1410 return self._lru.keys_set()
1411
maruel36a963d2016-04-08 17:15:49 -07001412 def cleanup(self):
1413 # At that point, the cache was already loaded, trimmed to respect cache
1414 # policies and invalid files were deleted.
1415 if self._evicted:
1416 logging.info(
1417 'Evicted items with the following sizes: %s', sorted(self._evicted))
1418
1419 # What remains to be done is to hash every single item to
1420 # detect corruption, then save to ensure state.json is up to date.
1421 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1422 # TODO(maruel): Let's revisit once directory metadata is stored in
1423 # state.json so only the files that had been mapped since the last cleanup()
1424 # call are manually verified.
1425 #
1426 #with self._lock:
1427 # for digest in self._lru:
1428 # if not isolated_format.is_valid_hash(
1429 # self._path(digest), self.hash_algo):
1430 # self.evict(digest)
1431 # logging.info('Deleted corrupted item: %s', digest)
1432
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001433 def touch(self, digest, size):
1434 """Verifies an actual file is valid.
1435
1436 Note that is doesn't compute the hash so it could still be corrupted if the
1437 file size didn't change.
1438
1439 TODO(maruel): More stringent verification while keeping the check fast.
1440 """
1441 # Do the check outside the lock.
1442 if not is_valid_file(self._path(digest), size):
1443 return False
1444
1445 # Update it's LRU position.
1446 with self._lock:
1447 if digest not in self._lru:
1448 return False
1449 self._lru.touch(digest)
maruel083fa552016-04-08 14:38:01 -07001450 self._protected.add(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001451 return True
1452
1453 def evict(self, digest):
1454 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001455 # Do not check for 'digest in self._protected' since it could be because
1456 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001457 self._lru.pop(digest)
1458 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1459
1460 def read(self, digest):
maruel12e30012015-10-09 11:55:35 -07001461 with fs.open(self._path(digest), 'rb') as f:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001462 return f.read()
1463
1464 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001465 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001466 with self._lock:
1467 self._protected.add(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001468 path = self._path(digest)
1469 # A stale broken file may remain. It is possible for the file to have write
1470 # access bit removed which would cause the file_write() call to fail to open
1471 # in write mode. Take no chance here.
1472 file_path.try_remove(path)
1473 try:
1474 size = file_write(path, content)
1475 except:
1476 # There are two possible places were an exception can occur:
1477 # 1) Inside |content| generator in case of network or unzipping errors.
1478 # 2) Inside file_write itself in case of disk IO errors.
1479 # In any case delete an incomplete file and propagate the exception to
1480 # caller, it will be logged there.
1481 file_path.try_remove(path)
1482 raise
1483 # Make the file read-only in the cache. This has a few side-effects since
1484 # the file node is modified, so every directory entries to this file becomes
1485 # read-only. It's fine here because it is a new file.
1486 file_path.set_read_only(path, True)
1487 with self._lock:
1488 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001489 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001490
1491 def hardlink(self, digest, dest, file_mode):
1492 """Hardlinks the file to |dest|.
1493
1494 Note that the file permission bits are on the file node, not the directory
1495 entry, so changing the access bit on any of the directory entries for the
1496 file node will affect them all.
1497 """
1498 path = self._path(digest)
maruel1f7e8162015-09-16 10:35:43 -07001499 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1500 # Report to the server that it failed with more details. We'll want to
1501 # squash them all.
1502 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1503
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001504 if file_mode is not None:
1505 # Ignores all other bits.
maruel12e30012015-10-09 11:55:35 -07001506 fs.chmod(dest, file_mode & 0500)
maruel064c0a32016-04-05 11:47:15 -07001507 with self._lock:
1508 self._linked.append(self._lru[digest])
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001509
1510 def _load(self):
1511 """Loads state of the cache from json file."""
1512 self._lock.assert_locked()
1513
1514 if not os.path.isdir(self.cache_dir):
maruel12e30012015-10-09 11:55:35 -07001515 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001516 else:
1517 # Make sure the cache is read-only.
1518 # TODO(maruel): Calculate the cost and optimize the performance
1519 # accordingly.
1520 file_path.make_tree_read_only(self.cache_dir)
1521
1522 # Load state of the cache.
maruel12e30012015-10-09 11:55:35 -07001523 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001524 try:
1525 self._lru = lru.LRUDict.load(self.state_file)
1526 except ValueError as err:
1527 logging.error('Failed to load cache state: %s' % (err,))
1528 # Don't want to keep broken state file.
1529 file_path.try_remove(self.state_file)
1530
1531 # Ensure that all files listed in the state still exist and add new ones.
1532 previous = self._lru.keys_set()
1533 unknown = []
maruel12e30012015-10-09 11:55:35 -07001534 for filename in fs.listdir(self.cache_dir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001535 if filename == self.STATE_FILE:
1536 continue
1537 if filename in previous:
maruel064c0a32016-04-05 11:47:15 -07001538 self._initial_size += self._lru[filename]
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001539 previous.remove(filename)
maruel064c0a32016-04-05 11:47:15 -07001540 self._initial_number_items += 1
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001541 continue
1542 # An untracked file.
1543 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1544 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001545 p = self._path(filename)
maruel12e30012015-10-09 11:55:35 -07001546 if fs.isdir(p):
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001547 try:
1548 file_path.rmtree(p)
1549 except OSError:
1550 pass
1551 else:
1552 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001553 continue
1554 # File that's not referenced in 'state.json'.
1555 # TODO(vadimsh): Verify its SHA1 matches file name.
1556 logging.warning('Adding unknown file %s to cache', filename)
1557 unknown.append(filename)
1558
1559 if unknown:
1560 # Add as oldest files. They will be deleted eventually if not accessed.
maruel064c0a32016-04-05 11:47:15 -07001561 pairs = []
1562 for digest in unknown:
1563 size = fs.stat(self._path(digest)).st_size
1564 self._initial_size += size
1565 self._initial_number_items += 1
1566 pairs.append((digest, size))
1567 self._lru.batch_insert_oldest(pairs)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001568 logging.warning('Added back %d unknown files', len(unknown))
1569
1570 if previous:
1571 # Filter out entries that were not found.
1572 logging.warning('Removed %d lost files', len(previous))
1573 for filename in previous:
1574 self._lru.pop(filename)
1575 self._trim()
1576
1577 def _save(self):
1578 """Saves the LRU ordering."""
1579 self._lock.assert_locked()
1580 if sys.platform != 'win32':
1581 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001582 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001583 # Necessary otherwise the file can't be created.
1584 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001585 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001586 file_path.set_read_only(self.state_file, False)
1587 self._lru.save(self.state_file)
1588
1589 def _trim(self):
1590 """Trims anything we don't know, make sure enough free space exists."""
1591 self._lock.assert_locked()
1592
1593 # Ensure maximum cache size.
1594 if self.policies.max_cache_size:
1595 total_size = sum(self._lru.itervalues())
1596 while total_size > self.policies.max_cache_size:
1597 total_size -= self._remove_lru_file()
1598
1599 # Ensure maximum number of items in the cache.
1600 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1601 for _ in xrange(len(self._lru) - self.policies.max_items):
1602 self._remove_lru_file()
1603
1604 # Ensure enough free space.
1605 self._free_disk = file_path.get_free_space(self.cache_dir)
1606 trimmed_due_to_space = False
1607 while (
1608 self.policies.min_free_space and
1609 self._lru and
1610 self._free_disk < self.policies.min_free_space):
1611 trimmed_due_to_space = True
1612 self._remove_lru_file()
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001613 if trimmed_due_to_space:
1614 total_usage = sum(self._lru.itervalues())
1615 usage_percent = 0.
1616 if total_usage:
1617 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1618 logging.warning(
1619 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1620 'cache (%.1f%% of its maximum capacity)',
1621 self._free_disk / 1024.,
1622 total_usage / 1024.,
1623 usage_percent)
1624 self._save()
1625
1626 def _path(self, digest):
1627 """Returns the path to one item."""
1628 return os.path.join(self.cache_dir, digest)
1629
1630 def _remove_lru_file(self):
1631 """Removes the last recently used file and returns its size."""
1632 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001633 try:
1634 digest, size = self._lru.get_oldest()
1635 except KeyError:
1636 raise Error('Nothing to remove')
1637 if digest in self._protected:
1638 raise Error('Not enough space to map the whole isolated tree')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001639 digest, size = self._lru.pop_oldest()
1640 self._delete_file(digest, size)
1641 return size
1642
1643 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1644 """Adds an item into LRU cache marking it as a newest one."""
1645 self._lock.assert_locked()
1646 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001647 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001648 self._added.append(size)
1649 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001650 self._free_disk -= size
1651 # Do a quicker version of self._trim(). It only enforces free disk space,
1652 # not cache size limits. It doesn't actually look at real free disk space,
1653 # only uses its cache values. self._trim() will be called later to enforce
1654 # real trimming but doing this quick version here makes it possible to map
1655 # an isolated that is larger than the current amount of free disk space when
1656 # the cache size is already large.
1657 while (
1658 self.policies.min_free_space and
1659 self._lru and
1660 self._free_disk < self.policies.min_free_space):
1661 self._remove_lru_file()
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001662
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001663 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1664 """Deletes cache file from the file system."""
1665 self._lock.assert_locked()
1666 try:
1667 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001668 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001669 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001670 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001671 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001672 except OSError as e:
1673 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1674
1675
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001676class IsolatedBundle(object):
1677 """Fetched and parsed .isolated file with all dependencies."""
1678
Vadim Shtayura3148e072014-09-02 18:51:52 -07001679 def __init__(self):
1680 self.command = []
1681 self.files = {}
1682 self.read_only = None
1683 self.relative_cwd = None
1684 # The main .isolated file, a IsolatedFile instance.
1685 self.root = None
1686
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001687 def fetch(self, fetch_queue, root_isolated_hash, algo):
1688 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001689
1690 It enables support for "included" .isolated files. They are processed in
1691 strict order but fetched asynchronously from the cache. This is important so
1692 that a file in an included .isolated file that is overridden by an embedding
1693 .isolated file is not fetched needlessly. The includes are fetched in one
1694 pass and the files are fetched as soon as all the ones on the left-side
1695 of the tree were fetched.
1696
1697 The prioritization is very important here for nested .isolated files.
1698 'includes' have the highest priority and the algorithm is optimized for both
1699 deep and wide trees. A deep one is a long link of .isolated files referenced
1700 one at a time by one item in 'includes'. A wide one has a large number of
1701 'includes' in a single .isolated file. 'left' is defined as an included
1702 .isolated file earlier in the 'includes' list. So the order of the elements
1703 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001704
1705 As a side effect this method starts asynchronous fetch of all data files
1706 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1707 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001708 """
1709 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1710
1711 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1712 pending = {}
1713 # Set of hashes of already retrieved items to refuse recursive includes.
1714 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001715 # Set of IsolatedFile's whose data files have already being fetched.
1716 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001717
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001718 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001719 h = isolated_file.obj_hash
1720 if h in seen:
1721 raise isolated_format.IsolatedError(
1722 'IsolatedFile %s is retrieved recursively' % h)
1723 assert h not in pending
1724 seen.add(h)
1725 pending[h] = isolated_file
1726 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1727
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001728 # Start fetching root *.isolated file (single file, not the whole bundle).
1729 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001730
1731 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001732 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001733 item_hash = fetch_queue.wait(pending)
1734 item = pending.pop(item_hash)
1735 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001736
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001737 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001738 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001739 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001740
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001741 # Always fetch *.isolated files in traversal order, waiting if necessary
1742 # until next to-be-processed node loads. "Waiting" is done by yielding
1743 # back to the outer loop, that waits until some *.isolated is loaded.
1744 for node in isolated_format.walk_includes(self.root):
1745 if node not in processed:
1746 # Not visited, and not yet loaded -> wait for it to load.
1747 if not node.is_loaded:
1748 break
1749 # Not visited and loaded -> process it and continue the traversal.
1750 self._start_fetching_files(node, fetch_queue)
1751 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001752
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001753 # All *.isolated files should be processed by now and only them.
1754 all_isolateds = set(isolated_format.walk_includes(self.root))
1755 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001756
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001757 # Extract 'command' and other bundle properties.
1758 for node in isolated_format.walk_includes(self.root):
1759 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001760 self.relative_cwd = self.relative_cwd or ''
1761
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001762 def _start_fetching_files(self, isolated, fetch_queue):
1763 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001764
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001765 Modifies self.files.
1766 """
1767 logging.debug('fetch_files(%s)', isolated.obj_hash)
1768 for filepath, properties in isolated.data.get('files', {}).iteritems():
1769 # Root isolated has priority on the files being mapped. In particular,
1770 # overridden files must not be fetched.
1771 if filepath not in self.files:
1772 self.files[filepath] = properties
1773 if 'h' in properties:
1774 # Preemptively request files.
1775 logging.debug('fetching %s', filepath)
1776 fetch_queue.add(
1777 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1778
1779 def _update_self(self, node):
1780 """Extracts bundle global parameters from loaded *.isolated file.
1781
1782 Will be called with each loaded *.isolated file in order of traversal of
1783 isolated include graph (see isolated_format.walk_includes).
1784 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001785 # Grabs properties.
1786 if not self.command and node.data.get('command'):
1787 # Ensure paths are correctly separated on windows.
1788 self.command = node.data['command']
1789 if self.command:
1790 self.command[0] = self.command[0].replace('/', os.path.sep)
1791 self.command = tools.fix_python_path(self.command)
1792 if self.read_only is None and node.data.get('read_only') is not None:
1793 self.read_only = node.data['read_only']
1794 if (self.relative_cwd is None and
1795 node.data.get('relative_cwd') is not None):
1796 self.relative_cwd = node.data['relative_cwd']
1797
1798
Vadim Shtayura8623c272014-12-01 11:45:27 -08001799def set_storage_api_class(cls):
1800 """Replaces StorageApi implementation used by default."""
1801 global _storage_api_cls
1802 assert _storage_api_cls is None
1803 assert issubclass(cls, StorageApi)
1804 _storage_api_cls = cls
1805
1806
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001807def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001808 """Returns an object that implements low-level StorageApi interface.
1809
1810 It is used by Storage to work with single isolate |namespace|. It should
1811 rarely be used directly by clients, see 'get_storage' for
1812 a better alternative.
1813
1814 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001815 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001816 namespace: isolate namespace to operate in, also defines hashing and
1817 compression scheme used, i.e. namespace names that end with '-gzip'
1818 store compressed data.
1819
1820 Returns:
1821 Instance of StorageApi subclass.
1822 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001823 cls = _storage_api_cls or IsolateServer
1824 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001825
1826
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001827def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001828 """Returns Storage class that can upload and download from |namespace|.
1829
1830 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001831 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001832 namespace: isolate namespace to operate in, also defines hashing and
1833 compression scheme used, i.e. namespace names that end with '-gzip'
1834 store compressed data.
1835
1836 Returns:
1837 Instance of Storage.
1838 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001839 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001840
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001841
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001842def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001843 """Uploads the given tree to the given url.
1844
1845 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001846 base_url: The url of the isolate server to upload to.
1847 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001848 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001849 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001850 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001851 # Filter out symlinks, since they are not represented by items on isolate
1852 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001853 items = []
1854 seen = set()
1855 skipped = 0
1856 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001857 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001858 if 'l' not in metadata and filepath not in seen:
1859 seen.add(filepath)
1860 item = FileItem(
1861 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001862 digest=metadata['h'],
1863 size=metadata['s'],
1864 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001865 items.append(item)
1866 else:
1867 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001868
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001869 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001870 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001871 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001872
1873
maruelb8d88d12016-04-08 12:54:01 -07001874def fetch_isolated(isolated_hash, storage, cache, outdir):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001875 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001876
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001877 Arguments:
1878 isolated_hash: hash of the root *.isolated file.
1879 storage: Storage class that communicates with isolate storage.
1880 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001881 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001882
1883 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001884 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001885 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001886 logging.debug(
maruelb8d88d12016-04-08 12:54:01 -07001887 'fetch_isolated(%s, %s, %s, %s)', isolated_hash, storage, cache, outdir)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001888 # Hash algorithm to use, defined by namespace |storage| is using.
1889 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001890 with cache:
1891 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001892 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001893
1894 with tools.Profiler('GetIsolateds'):
1895 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001896 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001897 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001898 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001899 try:
maruel1ceb3872015-10-14 06:10:44 -07001900 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001901 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001902 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001903 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1904 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001905
1906 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001907 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001908
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001909 with tools.Profiler('GetRest'):
1910 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001911 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001912 create_directories(outdir, bundle.files)
1913 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001914
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001915 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001916 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001917 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001918
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001919 # Multimap: digest -> list of pairs (path, props).
1920 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001921 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001922 if 'h' in props:
1923 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001924
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001925 # Now block on the remaining files to be downloaded and mapped.
1926 logging.info('Retrieving remaining files (%d of them)...',
1927 fetch_queue.pending_count)
1928 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001929 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001930 while remaining:
1931 detector.ping()
1932
1933 # Wait for any item to finish fetching to cache.
1934 digest = fetch_queue.wait(remaining)
1935
1936 # Link corresponding files to a fetched item in cache.
1937 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001938 cache.hardlink(
1939 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001940
1941 # Report progress.
1942 duration = time.time() - last_update
1943 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1944 msg = '%d files remaining...' % len(remaining)
1945 print msg
1946 logging.info(msg)
1947 last_update = time.time()
1948
1949 # Cache could evict some items we just tried to fetch, it's a fatal error.
1950 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001951 raise isolated_format.MappingError(
1952 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001953 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001954
1955
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001956def directory_to_metadata(root, algo, blacklist):
1957 """Returns the FileItem list and .isolated metadata for a directory."""
1958 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001959 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001960 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001961 metadata = {
1962 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001963 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001964 for relpath in paths
1965 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001966 for v in metadata.itervalues():
1967 v.pop('t')
1968 items = [
1969 FileItem(
1970 path=os.path.join(root, relpath),
1971 digest=meta['h'],
1972 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001973 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001974 for relpath, meta in metadata.iteritems() if 'h' in meta
1975 ]
1976 return items, metadata
1977
1978
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001979def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001980 """Stores every entries and returns the relevant data.
1981
1982 Arguments:
1983 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001984 files: list of file paths to upload. If a directory is specified, a
1985 .isolated file is created and its hash is returned.
1986 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001987
1988 Returns:
1989 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1990 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001991 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001992 assert all(isinstance(i, unicode) for i in files), files
1993 if len(files) != len(set(map(os.path.abspath, files))):
1994 raise Error('Duplicate entries found.')
1995
maruel064c0a32016-04-05 11:47:15 -07001996 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001997 results = []
1998 # The temporary directory is only created as needed.
1999 tempdir = None
2000 try:
2001 # TODO(maruel): Yield the files to a worker thread.
2002 items_to_upload = []
2003 for f in files:
2004 try:
2005 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002006 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002007 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002008 items, metadata = directory_to_metadata(
2009 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002010
2011 # Create the .isolated file.
2012 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002013 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2014 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002015 os.close(handle)
2016 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002017 'algo':
2018 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002019 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002020 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002021 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002022 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002023 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002024 items_to_upload.extend(items)
2025 items_to_upload.append(
2026 FileItem(
2027 path=isolated,
2028 digest=h,
maruel12e30012015-10-09 11:55:35 -07002029 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002030 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002031 results.append((h, f))
2032
maruel12e30012015-10-09 11:55:35 -07002033 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002034 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002035 items_to_upload.append(
2036 FileItem(
2037 path=filepath,
2038 digest=h,
maruel12e30012015-10-09 11:55:35 -07002039 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002040 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002041 results.append((h, f))
2042 else:
2043 raise Error('%s is neither a file or directory.' % f)
2044 except OSError:
2045 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002046 uploaded = storage.upload_items(items_to_upload)
2047 cold = [i for i in items_to_upload if i in uploaded]
2048 hot = [i for i in items_to_upload if i not in uploaded]
2049 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002050 finally:
maruel12e30012015-10-09 11:55:35 -07002051 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002052 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002053
2054
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002055def archive(out, namespace, files, blacklist):
2056 if files == ['-']:
2057 files = sys.stdin.readlines()
2058
2059 if not files:
2060 raise Error('Nothing to upload')
2061
2062 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002063 blacklist = tools.gen_blacklist(blacklist)
2064 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002065 # Ignore stats.
2066 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002067 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2068
2069
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002070@subcommand.usage('<file1..fileN> or - to read from stdin')
2071def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002072 """Archives data to the server.
2073
2074 If a directory is specified, a .isolated file is created the whole directory
2075 is uploaded. Then this .isolated file can be included in another one to run
2076 commands.
2077
2078 The commands output each file that was processed with its content hash. For
2079 directories, the .isolated generated for the directory is listed as the
2080 directory entry itself.
2081 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002082 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002083 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002084 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002085 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002086 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002087 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002088 except Error as e:
2089 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002090 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002091
2092
2093def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002094 """Download data from the server.
2095
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002096 It can either download individual files or a complete tree from a .isolated
2097 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002098 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002099 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002100 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002101 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002102 help='hash of an isolated file, .isolated file content is discarded, use '
2103 '--file if you need it')
2104 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002105 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2106 help='hash and destination of a file, can be used multiple times')
2107 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002108 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002109 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002110 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002111 options, args = parser.parse_args(args)
2112 if args:
2113 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002114
nodir55be77b2016-05-03 09:39:57 -07002115 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002116 if bool(options.isolated) == bool(options.file):
2117 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002118
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002119 cache = process_cache_options(options)
maruel12e30012015-10-09 11:55:35 -07002120 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002121 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002122 if (fs.isfile(options.target) or
2123 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002124 parser.error(
2125 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002126 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002127 # Fetching individual files.
2128 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002129 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002130 channel = threading_utils.TaskChannel()
2131 pending = {}
2132 for digest, dest in options.file:
2133 pending[digest] = dest
2134 storage.async_fetch(
2135 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002136 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002137 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002138 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002139 functools.partial(file_write, os.path.join(options.target, dest)))
2140 while pending:
2141 fetched = channel.pull()
2142 dest = pending.pop(fetched)
2143 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002144
Vadim Shtayura3172be52013-12-03 12:49:05 -08002145 # Fetching whole isolated tree.
2146 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002147 with cache:
2148 bundle = fetch_isolated(
2149 isolated_hash=options.isolated,
2150 storage=storage,
2151 cache=cache,
maruelb8d88d12016-04-08 12:54:01 -07002152 outdir=options.target)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002153 if bundle.command:
2154 rel = os.path.join(options.target, bundle.relative_cwd)
2155 print('To run this test please run from the directory %s:' %
2156 os.path.join(options.target, rel))
2157 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002158
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002159 return 0
2160
2161
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002162def add_archive_options(parser):
2163 parser.add_option(
2164 '--blacklist',
2165 action='append', default=list(DEFAULT_BLACKLIST),
2166 help='List of regexp to use as blacklist filter when uploading '
2167 'directories')
2168
2169
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002170def add_isolate_server_options(parser):
2171 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002172 parser.add_option(
2173 '-I', '--isolate-server',
2174 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002175 help='URL of the Isolate Server to use. Defaults to the environment '
2176 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2177 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002178 parser.add_option(
2179 '--namespace', default='default-gzip',
2180 help='The namespace to use on the Isolate Server, default: %default')
2181
2182
nodir55be77b2016-05-03 09:39:57 -07002183def process_isolate_server_options(
2184 parser, options, set_exception_handler, required):
2185 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002186
2187 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002188 """
2189 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002190 if required:
2191 parser.error('--isolate-server is required.')
2192 return
2193
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002194 try:
2195 options.isolate_server = net.fix_url(options.isolate_server)
2196 except ValueError as e:
2197 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002198 if set_exception_handler:
2199 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002200 try:
2201 return auth.ensure_logged_in(options.isolate_server)
2202 except ValueError as e:
2203 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002204
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002205
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002206def add_cache_options(parser):
2207 cache_group = optparse.OptionGroup(parser, 'Cache management')
2208 cache_group.add_option(
2209 '--cache', metavar='DIR',
2210 help='Directory to keep a local cache of the files. Accelerates download '
2211 'by reusing already downloaded files. Default=%default')
2212 cache_group.add_option(
2213 '--max-cache-size',
2214 type='int',
2215 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002216 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002217 help='Trim if the cache gets larger than this value, default=%default')
2218 cache_group.add_option(
2219 '--min-free-space',
2220 type='int',
2221 metavar='NNN',
2222 default=2*1024*1024*1024,
2223 help='Trim if disk free space becomes lower than this value, '
2224 'default=%default')
2225 cache_group.add_option(
2226 '--max-items',
2227 type='int',
2228 metavar='NNN',
2229 default=100000,
2230 help='Trim if more than this number of items are in the cache '
2231 'default=%default')
2232 parser.add_option_group(cache_group)
2233
2234
2235def process_cache_options(options):
2236 if options.cache:
2237 policies = CachePolicies(
2238 options.max_cache_size, options.min_free_space, options.max_items)
2239
2240 # |options.cache| path may not exist until DiskCache() instance is created.
2241 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002242 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002243 policies,
2244 isolated_format.get_hash_algo(options.namespace))
2245 else:
2246 return MemoryCache()
2247
2248
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002249class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002250 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002251 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002252 self,
2253 version=__version__,
2254 prog=os.path.basename(sys.modules[__name__].__file__),
2255 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002256 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002257
2258 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002259 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002260 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002261 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002262 return options, args
2263
2264
2265def main(args):
2266 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002267 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002268
2269
2270if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002271 fix_encoding.fix_encoding()
2272 tools.disable_buffering()
2273 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002274 sys.exit(main(sys.argv[1:]))