blob: cd4ab6576631644faf2925b13ea929e9b47ddb36 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
Marc-Antoine Ruel8add1242013-11-05 17:28:27 -05002# Copyright 2013 The Swarming Authors. All rights reserved.
Marc-Antoine Ruele98b1122013-11-05 20:27:57 -05003# Use of this source code is governed under the Apache License, Version 2.0 that
4# can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
maruel12e30012015-10-09 11:55:35 -07008__version__ = '0.4.5'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000017import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050018import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000019import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050021import types
maruel@chromium.orge82112e2013-04-24 14:41:55 +000022import urllib
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -050023import urlparse
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000026from third_party import colorama
27from third_party.depot_tools import fix_encoding
28from third_party.depot_tools import subcommand
29
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050030from utils import file_path
maruel12e30012015-10-09 11:55:35 -070031from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040032from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040033from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# The file size to be used when we don't know the correct file size,
48# generally used for .isolated files.
49UNKNOWN_FILE_SIZE = None
50
51
52# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000080# Chunk size to use when reading from network stream.
81NET_IO_FILE_CHUNK = 16 * 1024
82
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000083
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000084# Read timeout in seconds for downloads from isolate storage. If there's no
85# response from the server within this timeout whole download will be aborted.
86DOWNLOAD_READ_TIMEOUT = 60
87
88
maruel@chromium.org41601642013-09-18 19:40:46 +000089# The delay (in seconds) to wait between logging statements when retrieving
90# the required files. This is intended to let the user (or buildbot) know that
91# the program is still running.
92DELAY_BETWEEN_UPDATES_IN_SECS = 30
93
94
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050095DEFAULT_BLACKLIST = (
96 # Temporary vim or python files.
97 r'^.+\.(?:pyc|swp)$',
98 # .git or .svn directory.
99 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
100)
101
102
Vadim Shtayura8623c272014-12-01 11:45:27 -0800103# A class to use to communicate with the server by default. Can be changed by
104# 'set_storage_api_class'. Default is IsolateServer.
105_storage_api_cls = None
106
107
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500108class Error(Exception):
109 """Generic runtime error."""
110 pass
111
112
maruela72f46e2016-02-24 11:05:45 -0800113class IsolatedErrorNoCommand(isolated_format.IsolatedError):
114 """Signals an early abort due to lack of command specified."""
115 pass
116
117
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400118class Aborted(Error):
119 """Operation aborted."""
120 pass
121
122
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000123def stream_read(stream, chunk_size):
124 """Reads chunks from |stream| and yields them."""
125 while True:
126 data = stream.read(chunk_size)
127 if not data:
128 break
129 yield data
130
131
maruel12e30012015-10-09 11:55:35 -0700132def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800133 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700134 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800135 if offset:
136 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000138 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000139 if not data:
140 break
141 yield data
142
143
maruel12e30012015-10-09 11:55:35 -0700144def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000145 """Writes file content as generated by content_generator.
146
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 Creates the intermediary directory as needed.
148
149 Returns the number of bytes written.
150
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000151 Meant to be mocked out in unit tests.
152 """
maruel12e30012015-10-09 11:55:35 -0700153 filedir = os.path.dirname(path)
154 if not fs.isdir(filedir):
155 fs.makedirs(filedir)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000156 total = 0
maruel12e30012015-10-09 11:55:35 -0700157 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000158 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000159 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000160 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000161 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000162
163
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000164def zip_compress(content_generator, level=7):
165 """Reads chunks from |content_generator| and yields zip compressed chunks."""
166 compressor = zlib.compressobj(level)
167 for chunk in content_generator:
168 compressed = compressor.compress(chunk)
169 if compressed:
170 yield compressed
171 tail = compressor.flush(zlib.Z_FINISH)
172 if tail:
173 yield tail
174
175
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400176def zip_decompress(
177 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000178 """Reads zipped data from |content_generator| and yields decompressed data.
179
180 Decompresses data in small chunks (no larger than |chunk_size|) so that
181 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
182
183 Raises IOError if data is corrupted or incomplete.
184 """
185 decompressor = zlib.decompressobj()
186 compressed_size = 0
187 try:
188 for chunk in content_generator:
189 compressed_size += len(chunk)
190 data = decompressor.decompress(chunk, chunk_size)
191 if data:
192 yield data
193 while decompressor.unconsumed_tail:
194 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
195 if data:
196 yield data
197 tail = decompressor.flush()
198 if tail:
199 yield tail
200 except zlib.error as e:
201 raise IOError(
202 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
203 # Ensure all data was read and decompressed.
204 if decompressor.unused_data or decompressor.unconsumed_tail:
205 raise IOError('Not all data was decompressed')
206
207
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000208def get_zip_compression_level(filename):
209 """Given a filename calculates the ideal zip compression level to use."""
210 file_ext = os.path.splitext(filename)[1].lower()
211 # TODO(csharp): Profile to find what compression level works best.
212 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
213
214
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000215def create_directories(base_directory, files):
216 """Creates the directory structure needed by the given list of files."""
217 logging.debug('create_directories(%s, %d)', base_directory, len(files))
218 # Creates the tree of directories to create.
219 directories = set(os.path.dirname(f) for f in files)
220 for item in list(directories):
221 while item:
222 directories.add(item)
223 item = os.path.dirname(item)
224 for d in sorted(directories):
225 if d:
maruel12e30012015-10-09 11:55:35 -0700226 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000227
228
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500229def create_symlinks(base_directory, files):
230 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000231 for filepath, properties in files:
232 if 'l' not in properties:
233 continue
234 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500235 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000236 logging.warning('Ignoring symlink %s', filepath)
237 continue
238 outfile = os.path.join(base_directory, filepath)
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500239 # os.symlink() doesn't exist on Windows.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000240 os.symlink(properties['l'], outfile) # pylint: disable=E1101
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000241
242
maruel12e30012015-10-09 11:55:35 -0700243def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000244 """Determines if the given files appears valid.
245
246 Currently it just checks the file's size.
247 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700248 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700249 return fs.isfile(path)
250 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000251 if size != actual_size:
252 logging.warning(
253 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700254 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000255 return False
256 return True
257
258
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000259class Item(object):
260 """An item to push to Storage.
261
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800262 Its digest and size may be provided in advance, if known. Otherwise they will
263 be derived from content(). If digest is provided, it MUST correspond to
264 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000265
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 When used with Storage, Item starts its life in a main thread, travels
267 to 'contains' thread, then to 'push' thread and then finally back to
268 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000269 """
270
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800271 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000272 self.digest = digest
273 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800274 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000275 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000276
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800277 def content(self):
278 """Iterable with content of this item as byte string (str) chunks."""
279 raise NotImplementedError()
280
281 def prepare(self, hash_algo):
282 """Ensures self.digest and self.size are set.
283
284 Uses content() as a source of data to calculate them. Does nothing if digest
285 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000286
287 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800288 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000289 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800290 if self.digest is None or self.size is None:
291 digest = hash_algo()
292 total = 0
293 for chunk in self.content():
294 digest.update(chunk)
295 total += len(chunk)
296 self.digest = digest.hexdigest()
297 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000298
299
300class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800301 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000302
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800303 Its digest and size may be provided in advance, if known. Otherwise they will
304 be derived from the file content.
305 """
306
307 def __init__(self, path, digest=None, size=None, high_priority=False):
308 super(FileItem, self).__init__(
309 digest,
maruel12e30012015-10-09 11:55:35 -0700310 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800311 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000312 self.path = path
313 self.compression_level = get_zip_compression_level(path)
314
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800315 def content(self):
316 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000317
318
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319class BufferItem(Item):
320 """A byte buffer to push to Storage."""
321
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800322 def __init__(self, buf, high_priority=False):
323 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000324 self.buffer = buf
325
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000327 return [self.buffer]
328
329
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000330class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800331 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000332
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800333 Implements compression support, parallel 'contains' checks, parallel uploads
334 and more.
335
336 Works only within single namespace (and thus hashing algorithm and compression
337 scheme are fixed).
338
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400339 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
340 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800341 """
342
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700343 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000344 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400345 self._use_zip = isolated_format.is_namespace_with_compression(
346 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400347 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000348 self._cpu_thread_pool = None
349 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400350 self._aborted = False
351 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000352
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000353 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700354 def hash_algo(self):
355 """Hashing algorithm used to name files in storage based on their content.
356
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400357 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700358 """
359 return self._hash_algo
360
361 @property
362 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500363 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700364 return self._storage_api.location
365
366 @property
367 def namespace(self):
368 """Isolate namespace used by this storage.
369
370 Indirectly defines hashing scheme and compression method used.
371 """
372 return self._storage_api.namespace
373
374 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000375 def cpu_thread_pool(self):
376 """ThreadPool for CPU-bound tasks like zipping."""
377 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500378 threads = max(threading_utils.num_processors(), 2)
379 if sys.maxsize <= 2L**32:
380 # On 32 bits userland, do not try to use more than 16 threads.
381 threads = min(threads, 16)
382 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000383 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000384
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000385 @property
386 def net_thread_pool(self):
387 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
388 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700389 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000390 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000391
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000392 def close(self):
393 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400394 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000395 if self._cpu_thread_pool:
396 self._cpu_thread_pool.join()
397 self._cpu_thread_pool.close()
398 self._cpu_thread_pool = None
399 if self._net_thread_pool:
400 self._net_thread_pool.join()
401 self._net_thread_pool.close()
402 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400403 logging.info('Done.')
404
405 def abort(self):
406 """Cancels any pending or future operations."""
407 # This is not strictly theadsafe, but in the worst case the logging message
408 # will be printed twice. Not a big deal. In other places it is assumed that
409 # unprotected reads and writes to _aborted are serializable (it is true
410 # for python) and thus no locking is used.
411 if not self._aborted:
412 logging.warning('Aborting... It can take a while.')
413 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000414
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 def __enter__(self):
416 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 assert not self._prev_sig_handlers, self._prev_sig_handlers
418 for s in (signal.SIGINT, signal.SIGTERM):
419 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 return self
421
422 def __exit__(self, _exc_type, _exc_value, _traceback):
423 """Context manager interface."""
424 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400425 while self._prev_sig_handlers:
426 s, h = self._prev_sig_handlers.popitem()
427 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000428 return False
429
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000430 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800431 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000432
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800433 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000434
435 Arguments:
436 items: list of Item instances that represents data to upload.
437
438 Returns:
439 List of items that were uploaded. All other items are already there.
440 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700441 logging.info('upload_items(items=%d)', len(items))
442
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800443 # Ensure all digests are calculated.
444 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700445 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800446
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000447 # For each digest keep only first Item that matches it. All other items
448 # are just indistinguishable copies from the point of view of isolate
449 # server (it doesn't care about paths at all, only content and digests).
450 seen = {}
451 duplicates = 0
452 for item in items:
453 if seen.setdefault(item.digest, item) is not item:
454 duplicates += 1
455 items = seen.values()
456 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700457 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000460 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000461 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800462 channel = threading_utils.TaskChannel()
463 for missing_item, push_state in self.get_missing_items(items):
464 missing.add(missing_item)
465 self.async_push(channel, missing_item, push_state)
466
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000467 # No need to spawn deadlock detector thread if there's nothing to upload.
468 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700469 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000470 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000471 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000472 detector.ping()
473 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000474 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000475 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000476 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000477 logging.info('All files are uploaded')
478
479 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000480 total = len(items)
481 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 logging.info(
483 'Total: %6d, %9.1fkb',
484 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000485 total_size / 1024.)
486 cache_hit = set(items) - missing
487 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000488 logging.info(
489 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
490 len(cache_hit),
491 cache_hit_size / 1024.,
492 len(cache_hit) * 100. / total,
493 cache_hit_size * 100. / total_size if total_size else 0)
494 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000495 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000496 logging.info(
497 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
498 len(cache_miss),
499 cache_miss_size / 1024.,
500 len(cache_miss) * 100. / total,
501 cache_miss_size * 100. / total_size if total_size else 0)
502
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000503 return uploaded
504
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800505 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000506 """Starts asynchronous push to the server in a parallel thread.
507
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800508 Can be used only after |item| was checked for presence on a server with
509 'get_missing_items' call. 'get_missing_items' returns |push_state| object
510 that contains storage specific information describing how to upload
511 the item (for example in case of cloud storage, it is signed upload URLs).
512
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000513 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000514 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000515 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 push_state: push state returned by 'get_missing_items' call for |item|.
517
518 Returns:
519 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000520 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800521 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400522 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700523 threading_utils.PRIORITY_HIGH if item.high_priority
524 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800525
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000526 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400527 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400528 if self._aborted:
529 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700530 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 return item
533
534 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700535 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800536 self.net_thread_pool.add_task_with_channel(
537 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 return
539
540 # If zipping is enabled, zip in a separate thread.
541 def zip_and_push():
542 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
543 # content right here. It will block until all file is zipped.
544 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400545 if self._aborted:
546 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800547 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000548 data = ''.join(stream)
549 except Exception as exc:
550 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800551 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000553 self.net_thread_pool.add_task_with_channel(
554 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000555 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000556
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800557 def push(self, item, push_state):
558 """Synchronously pushes a single item to the server.
559
560 If you need to push many items at once, consider using 'upload_items' or
561 'async_push' with instance of TaskChannel.
562
563 Arguments:
564 item: item to upload as instance of Item class.
565 push_state: push state returned by 'get_missing_items' call for |item|.
566
567 Returns:
568 Pushed item (same object as |item|).
569 """
570 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700571 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 self.async_push(channel, item, push_state)
573 pushed = channel.pull()
574 assert pushed is item
575 return item
576
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000577 def async_fetch(self, channel, priority, digest, size, sink):
578 """Starts asynchronous fetch from the server in a parallel thread.
579
580 Arguments:
581 channel: TaskChannel that receives back |digest| when download ends.
582 priority: thread pool task priority for the fetch.
583 digest: hex digest of an item to download.
584 size: expected size of the item (after decompression).
585 sink: function that will be called as sink(generator).
586 """
587 def fetch():
588 try:
589 # Prepare reading pipeline.
590 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700591 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400592 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 # Run |stream| through verifier that will assert its size.
594 verifier = FetchStreamVerifier(stream, size)
595 # Verified stream goes to |sink|.
596 sink(verifier.run())
597 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800598 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000599 raise
600 return digest
601
602 # Don't bother with zip_thread_pool for decompression. Decompression is
603 # really fast and most probably IO bound anyway.
604 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
605
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000606 def get_missing_items(self, items):
607 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000608
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000609 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000610
611 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000612 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613
614 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800615 For each missing item it yields a pair (item, push_state), where:
616 * item - Item object that is missing (one of |items|).
617 * push_state - opaque object that contains storage specific information
618 describing how to upload the item (for example in case of cloud
619 storage, it is signed upload URLs). It can later be passed to
620 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000621 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000622 channel = threading_utils.TaskChannel()
623 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624
625 # Ensure all digests are calculated.
626 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700627 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800628
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400629 def contains(batch):
630 if self._aborted:
631 raise Aborted()
632 return self._storage_api.contains(batch)
633
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000634 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400636 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400637 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000638 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000640 # Yield results as they come in.
641 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 for missing_item, push_state in channel.pull().iteritems():
643 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000644
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000645
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646def batch_items_for_check(items):
647 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000648
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649 Each batch corresponds to a single 'exists?' query to the server via a call
650 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000651
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800652 Arguments:
653 items: a list of Item objects.
654
655 Yields:
656 Batches of items to query for existence in a single operation,
657 each batch is a list of Item objects.
658 """
659 batch_count = 0
660 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
661 next_queries = []
662 for item in sorted(items, key=lambda x: x.size, reverse=True):
663 next_queries.append(item)
664 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000665 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800666 next_queries = []
667 batch_count += 1
668 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
669 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
670 if next_queries:
671 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000672
673
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000674class FetchQueue(object):
675 """Fetches items from Storage and places them into LocalCache.
676
677 It manages multiple concurrent fetch operations. Acts as a bridge between
678 Storage and LocalCache so that Storage and LocalCache don't depend on each
679 other at all.
680 """
681
682 def __init__(self, storage, cache):
683 self.storage = storage
684 self.cache = cache
685 self._channel = threading_utils.TaskChannel()
686 self._pending = set()
687 self._accessed = set()
688 self._fetched = cache.cached_set()
689
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400690 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700691 self,
692 digest,
693 size=UNKNOWN_FILE_SIZE,
694 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000695 """Starts asynchronous fetch of item |digest|."""
696 # Fetching it now?
697 if digest in self._pending:
698 return
699
700 # Mark this file as in use, verify_all_cached will later ensure it is still
701 # in cache.
702 self._accessed.add(digest)
703
704 # Already fetched? Notify cache to update item's LRU position.
705 if digest in self._fetched:
706 # 'touch' returns True if item is in cache and not corrupted.
707 if self.cache.touch(digest, size):
708 return
709 # Item is corrupted, remove it from cache and fetch it again.
710 self._fetched.remove(digest)
711 self.cache.evict(digest)
712
713 # TODO(maruel): It should look at the free disk space, the current cache
714 # size and the size of the new item on every new item:
715 # - Trim the cache as more entries are listed when free disk space is low,
716 # otherwise if the amount of data downloaded during the run > free disk
717 # space, it'll crash.
718 # - Make sure there's enough free disk space to fit all dependencies of
719 # this run! If not, abort early.
720
721 # Start fetching.
722 self._pending.add(digest)
723 self.storage.async_fetch(
724 self._channel, priority, digest, size,
725 functools.partial(self.cache.write, digest))
726
727 def wait(self, digests):
728 """Starts a loop that waits for at least one of |digests| to be retrieved.
729
730 Returns the first digest retrieved.
731 """
732 # Flush any already fetched items.
733 for digest in digests:
734 if digest in self._fetched:
735 return digest
736
737 # Ensure all requested items are being fetched now.
738 assert all(digest in self._pending for digest in digests), (
739 digests, self._pending)
740
741 # Wait for some requested item to finish fetching.
742 while self._pending:
743 digest = self._channel.pull()
744 self._pending.remove(digest)
745 self._fetched.add(digest)
746 if digest in digests:
747 return digest
748
749 # Should never reach this point due to assert above.
750 raise RuntimeError('Impossible state')
751
752 def inject_local_file(self, path, algo):
753 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700754 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000755 data = f.read()
756 digest = algo(data).hexdigest()
757 self.cache.write(digest, [data])
758 self._fetched.add(digest)
759 return digest
760
761 @property
762 def pending_count(self):
763 """Returns number of items to be fetched."""
764 return len(self._pending)
765
766 def verify_all_cached(self):
767 """True if all accessed items are in cache."""
768 return self._accessed.issubset(self.cache.cached_set())
769
770
771class FetchStreamVerifier(object):
772 """Verifies that fetched file is valid before passing it to the LocalCache."""
773
774 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400775 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000776 self.stream = stream
777 self.expected_size = expected_size
778 self.current_size = 0
779
780 def run(self):
781 """Generator that yields same items as |stream|.
782
783 Verifies |stream| is complete before yielding a last chunk to consumer.
784
785 Also wraps IOError produced by consumer into MappingError exceptions since
786 otherwise Storage will retry fetch on unrelated local cache errors.
787 """
788 # Read one chunk ahead, keep it in |stored|.
789 # That way a complete stream can be verified before pushing last chunk
790 # to consumer.
791 stored = None
792 for chunk in self.stream:
793 assert chunk is not None
794 if stored is not None:
795 self._inspect_chunk(stored, is_last=False)
796 try:
797 yield stored
798 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400799 raise isolated_format.MappingError(
800 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000801 stored = chunk
802 if stored is not None:
803 self._inspect_chunk(stored, is_last=True)
804 try:
805 yield stored
806 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400807 raise isolated_format.MappingError(
808 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000809
810 def _inspect_chunk(self, chunk, is_last):
811 """Called for each fetched chunk before passing it to consumer."""
812 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400813 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700814 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000815 (self.expected_size != self.current_size)):
816 raise IOError('Incorrect file size: expected %d, got %d' % (
817 self.expected_size, self.current_size))
818
819
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000820class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800821 """Interface for classes that implement low-level storage operations.
822
823 StorageApi is oblivious of compression and hashing scheme used. This details
824 are handled in higher level Storage class.
825
826 Clients should generally not use StorageApi directly. Storage class is
827 preferred since it implements compression and upload optimizations.
828 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000829
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700830 @property
831 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500832 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700833 raise NotImplementedError()
834
835 @property
836 def namespace(self):
837 """Isolate namespace used by this storage.
838
839 Indirectly defines hashing scheme and compression method used.
840 """
841 raise NotImplementedError()
842
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800843 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000844 """Fetches an object and yields its content.
845
846 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000847 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800848 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000849
850 Yields:
851 Chunks of downloaded item (as str objects).
852 """
853 raise NotImplementedError()
854
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800855 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000856 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000857
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800858 |item| MUST go through 'contains' call to get |push_state| before it can
859 be pushed to the storage.
860
861 To be clear, here is one possible usage:
862 all_items = [... all items to push as Item subclasses ...]
863 for missing_item, push_state in storage_api.contains(all_items).items():
864 storage_api.push(missing_item, push_state)
865
866 When pushing to a namespace with compression, data that should be pushed
867 and data provided by the item is not the same. In that case |content| is
868 not None and it yields chunks of compressed data (using item.content() as
869 a source of original uncompressed data). This is implemented by Storage
870 class.
871
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000872 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000873 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800874 push_state: push state object as returned by 'contains' call.
875 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000876
877 Returns:
878 None.
879 """
880 raise NotImplementedError()
881
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000882 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800883 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000884
885 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800886 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000887
888 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800889 A dict missing Item -> opaque push state object to be passed to 'push'.
890 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000891 """
892 raise NotImplementedError()
893
894
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800895class _IsolateServerPushState(object):
896 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500897
898 Note this needs to be a global class to support pickling.
899 """
900
Cory Massarocc19c8c2015-03-10 13:35:11 -0700901 def __init__(self, preupload_status, size):
902 self.preupload_status = preupload_status
903 gs_upload_url = preupload_status.get('gs_upload_url') or None
904 if gs_upload_url:
905 self.upload_url = gs_upload_url
906 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
907 else:
908 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
909 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500910 self.uploaded = False
911 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500912 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500913
914
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000915class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000916 """StorageApi implementation that downloads and uploads to Isolate Server.
917
918 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800919 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000920 """
921
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000922 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000923 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500924 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700925 self._base_url = base_url.rstrip('/')
926 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700927 self._namespace_dict = {
928 'compression': 'flate' if namespace.endswith(
929 ('-gzip', '-flate')) else '',
930 'digest_hash': 'sha-1',
931 'namespace': namespace,
932 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000933 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000934 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500935 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000936
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000937 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000938 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700939 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000940
941 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700942 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000943 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000944 # TODO(maruel): Make this request much earlier asynchronously while the
945 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800946
947 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
948 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700949
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000950 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000951 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700952 self._server_caps = net.url_read_json(
953 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
954 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000955 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000956
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700957 @property
958 def location(self):
959 return self._base_url
960
961 @property
962 def namespace(self):
963 return self._namespace
964
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800965 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700966 assert offset >= 0
967 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
968 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800969 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700970 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000971
Cory Massarocc19c8c2015-03-10 13:35:11 -0700972 if not response:
maruele154f9c2015-09-14 11:03:15 -0700973 raise IOError(
974 'Attempted to fetch from %s; no data exist: %s / %s.' % (
975 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800976
Cory Massarocc19c8c2015-03-10 13:35:11 -0700977 # for DB uploads
978 content = response.get('content')
979 if content is not None:
980 return base64.b64decode(content)
981
982 # for GS entities
983 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -0700984 if not connection:
985 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -0700986
987 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800988 if offset:
989 content_range = connection.get_header('Content-Range')
990 if not content_range:
991 raise IOError('Missing Content-Range header')
992
993 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
994 # According to a spec, <size> can be '*' meaning "Total size of the file
995 # is not known in advance".
996 try:
997 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
998 if not match:
999 raise ValueError()
1000 content_offset = int(match.group(1))
1001 last_byte_index = int(match.group(2))
1002 size = None if match.group(3) == '*' else int(match.group(3))
1003 except ValueError:
1004 raise IOError('Invalid Content-Range header: %s' % content_range)
1005
1006 # Ensure returned offset equals requested one.
1007 if offset != content_offset:
1008 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1009 offset, content_offset, content_range))
1010
1011 # Ensure entire tail of the file is returned.
1012 if size is not None and last_byte_index + 1 != size:
1013 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1014
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015 return stream_read(connection, NET_IO_FILE_CHUNK)
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001016
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001017 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001018 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001019 assert item.digest is not None
1020 assert item.size is not None
1021 assert isinstance(push_state, _IsolateServerPushState)
1022 assert not push_state.finalized
1023
1024 # Default to item.content().
1025 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001026 logging.info('Push state size: %d', push_state.size)
1027 if isinstance(content, (basestring, list)):
1028 # Memory is already used, too late.
1029 with self._lock:
1030 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001031 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001032 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1033 # If |content| is indeed a generator, it can not be re-winded back to the
1034 # beginning of the stream. A retry will find it exhausted. A possible
1035 # solution is to wrap |content| generator with some sort of caching
1036 # restartable generator. It should be done alongside streaming support
1037 # implementation.
1038 #
1039 # In theory, we should keep the generator, so that it is not serialized in
1040 # memory. Sadly net.HttpService.request() requires the body to be
1041 # serialized.
1042 assert isinstance(content, types.GeneratorType), repr(content)
1043 slept = False
1044 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001045 # One byte less than 512mb. This is to cope with incompressible content.
1046 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001047 while True:
1048 with self._lock:
1049 # This is due to 32 bits python when uploading very large files. The
1050 # problem is that it's comparing uncompressed sizes, while we care
1051 # about compressed sizes since it's what is serialized in memory.
1052 # The first check assumes large files are compressible and that by
1053 # throttling one upload at once, we can survive. Otherwise, kaboom.
1054 memory_use = self._memory_use
1055 if ((push_state.size >= max_size and not memory_use) or
1056 (memory_use + push_state.size <= max_size)):
1057 self._memory_use += push_state.size
1058 memory_use = self._memory_use
1059 break
1060 time.sleep(0.1)
1061 slept = True
1062 if slept:
1063 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001064
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001065 try:
1066 # This push operation may be a retry after failed finalization call below,
1067 # no need to reupload contents in that case.
1068 if not push_state.uploaded:
1069 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001070 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001071 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001072 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001073 item.digest, push_state.upload_url))
1074 push_state.uploaded = True
1075 else:
1076 logging.info(
1077 'A file %s already uploaded, retrying finalization only',
1078 item.digest)
1079
1080 # Optionally notify the server that it's done.
1081 if push_state.finalize_url:
1082 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1083 # send it to isolated server. That way isolate server can verify that
1084 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1085 # stored files).
1086 # TODO(maruel): Fix the server to accept properly data={} so
1087 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001088 response = net.url_read_json(
1089 url='%s/%s' % (self._base_url, push_state.finalize_url),
1090 data={
1091 'upload_ticket': push_state.preupload_status['upload_ticket'],
1092 })
1093 if not response or not response['ok']:
1094 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001095 push_state.finalized = True
1096 finally:
1097 with self._lock:
1098 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001099
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001100 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001101 # Ensure all items were initialized with 'prepare' call. Storage does that.
1102 assert all(i.digest is not None and i.size is not None for i in items)
1103
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001104 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001105 body = {
1106 'items': [
1107 {
1108 'digest': item.digest,
1109 'is_isolated': bool(item.high_priority),
1110 'size': item.size,
1111 } for item in items
1112 ],
1113 'namespace': self._namespace_dict,
1114 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001115
Cory Massarocc19c8c2015-03-10 13:35:11 -07001116 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001117
1118 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001119 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001120 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001121 response = net.url_read_json(url=query_url, data=body)
1122 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001123 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001124 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001125 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001126 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001127 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001128
1129 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001130 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001131 for preupload_status in response.get('items', []):
1132 assert 'upload_ticket' in preupload_status, (
1133 preupload_status, '/preupload did not generate an upload ticket')
1134 index = int(preupload_status['index'])
1135 missing_items[items[index]] = _IsolateServerPushState(
1136 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001137 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001138 len(items), len(items) - len(missing_items))
1139 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001140
Cory Massarocc19c8c2015-03-10 13:35:11 -07001141 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001142 """Fetches isolated data from the URL.
1143
1144 Used only for fetching files, not for API calls. Can be overridden in
1145 subclasses.
1146
1147 Args:
1148 url: URL to fetch the data from, can possibly return http redirect.
1149 offset: byte offset inside the file to start fetching from.
1150
1151 Returns:
1152 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1153 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001154 assert isinstance(offset, int)
1155 data = {
1156 'digest': digest.encode('utf-8'),
1157 'namespace': self._namespace_dict,
1158 'offset': offset,
1159 }
maruel0c25f4f2015-12-15 05:41:17 -08001160 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1161 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001162 return net.url_read_json(
1163 url=url,
1164 data=data,
1165 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001166
Cory Massarocc19c8c2015-03-10 13:35:11 -07001167 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001168 """Uploads isolated file to the URL.
1169
1170 Used only for storing files, not for API calls. Can be overridden in
1171 subclasses.
1172
1173 Args:
1174 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001175 push_state: an _IsolateServicePushState instance
1176 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001177 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001178 """
1179 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1180 # upload support is implemented.
1181 if isinstance(content, list) and len(content) == 1:
1182 content = content[0]
1183 else:
1184 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001185
1186 # DB upload
1187 if not push_state.finalize_url:
1188 url = '%s/%s' % (self._base_url, push_state.upload_url)
1189 content = base64.b64encode(content)
1190 data = {
1191 'upload_ticket': push_state.preupload_status['upload_ticket'],
1192 'content': content,
1193 }
1194 response = net.url_read_json(url=url, data=data)
1195 return response is not None and response['ok']
1196
1197 # upload to GS
1198 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001199 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001200 content_type='application/octet-stream',
1201 data=content,
1202 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001203 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001204 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001205 return response is not None
1206
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001207
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001208class LocalCache(object):
1209 """Local cache that stores objects fetched via Storage.
1210
1211 It can be accessed concurrently from multiple threads, so it should protect
1212 its internal state with some lock.
1213 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001214 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001215
1216 def __enter__(self):
1217 """Context manager interface."""
1218 return self
1219
1220 def __exit__(self, _exc_type, _exec_value, _traceback):
1221 """Context manager interface."""
1222 return False
1223
1224 def cached_set(self):
1225 """Returns a set of all cached digests (always a new object)."""
1226 raise NotImplementedError()
1227
1228 def touch(self, digest, size):
1229 """Ensures item is not corrupted and updates its LRU position.
1230
1231 Arguments:
1232 digest: hash digest of item to check.
1233 size: expected size of this item.
1234
1235 Returns:
1236 True if item is in cache and not corrupted.
1237 """
1238 raise NotImplementedError()
1239
1240 def evict(self, digest):
1241 """Removes item from cache if it's there."""
1242 raise NotImplementedError()
1243
1244 def read(self, digest):
1245 """Returns contents of the cached item as a single str."""
1246 raise NotImplementedError()
1247
1248 def write(self, digest, content):
1249 """Reads data from |content| generator and stores it in cache."""
1250 raise NotImplementedError()
1251
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001252 def hardlink(self, digest, dest, file_mode):
1253 """Ensures file at |dest| has same content as cached |digest|.
1254
1255 If file_mode is provided, it is used to set the executable bit if
1256 applicable.
1257 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001258 raise NotImplementedError()
1259
1260
1261class MemoryCache(LocalCache):
1262 """LocalCache implementation that stores everything in memory."""
1263
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001264 def __init__(self, file_mode_mask=0500):
1265 """Args:
1266 file_mode_mask: bit mask to AND file mode with. Default value will make
1267 all mapped files to be read only.
1268 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001269 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001270 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001271 # Let's not assume dict is thread safe.
1272 self._lock = threading.Lock()
1273 self._contents = {}
1274
1275 def cached_set(self):
1276 with self._lock:
1277 return set(self._contents)
1278
1279 def touch(self, digest, size):
1280 with self._lock:
1281 return digest in self._contents
1282
1283 def evict(self, digest):
1284 with self._lock:
1285 self._contents.pop(digest, None)
1286
1287 def read(self, digest):
1288 with self._lock:
1289 return self._contents[digest]
1290
1291 def write(self, digest, content):
1292 # Assemble whole stream before taking the lock.
1293 data = ''.join(content)
1294 with self._lock:
1295 self._contents[digest] = data
1296
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001297 def hardlink(self, digest, dest, file_mode):
1298 """Since data is kept in memory, there is no filenode to hardlink."""
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001299 file_write(dest, [self.read(digest)])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001300 if file_mode is not None:
maruel12e30012015-10-09 11:55:35 -07001301 fs.chmod(dest, file_mode & self._file_mode_mask)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001302
1303
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001304class CachePolicies(object):
1305 def __init__(self, max_cache_size, min_free_space, max_items):
1306 """
1307 Arguments:
1308 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1309 cache is effectively a leak.
1310 - min_free_space: Trim if disk free space becomes lower than this value. If
1311 0, it unconditionally fill the disk.
1312 - max_items: Maximum number of items to keep in the cache. If 0, do not
1313 enforce a limit.
1314 """
1315 self.max_cache_size = max_cache_size
1316 self.min_free_space = min_free_space
1317 self.max_items = max_items
1318
1319
1320class DiskCache(LocalCache):
1321 """Stateful LRU cache in a flat hash table in a directory.
1322
1323 Saves its state as json file.
1324 """
maruel12e30012015-10-09 11:55:35 -07001325 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001326
1327 def __init__(self, cache_dir, policies, hash_algo):
1328 """
1329 Arguments:
1330 cache_dir: directory where to place the cache.
1331 policies: cache retention policies.
1332 algo: hashing algorithm used.
1333 """
1334 super(DiskCache, self).__init__()
1335 self.cache_dir = cache_dir
1336 self.policies = policies
1337 self.hash_algo = hash_algo
1338 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
1339
1340 # All protected methods (starting with '_') except _path should be called
1341 # with this lock locked.
1342 self._lock = threading_utils.LockWithAssert()
1343 self._lru = lru.LRUDict()
1344
1345 # Profiling values.
1346 self._added = []
1347 self._removed = []
1348 self._free_disk = 0
1349
1350 with tools.Profiler('Setup'):
1351 with self._lock:
1352 self._load()
1353
1354 def __enter__(self):
1355 return self
1356
1357 def __exit__(self, _exc_type, _exec_value, _traceback):
1358 with tools.Profiler('CleanupTrimming'):
1359 with self._lock:
1360 self._trim()
1361
1362 logging.info(
1363 '%5d (%8dkb) added',
1364 len(self._added), sum(self._added) / 1024)
1365 logging.info(
1366 '%5d (%8dkb) current',
1367 len(self._lru),
1368 sum(self._lru.itervalues()) / 1024)
1369 logging.info(
1370 '%5d (%8dkb) removed',
1371 len(self._removed), sum(self._removed) / 1024)
1372 logging.info(
1373 ' %8dkb free',
1374 self._free_disk / 1024)
1375 return False
1376
1377 def cached_set(self):
1378 with self._lock:
1379 return self._lru.keys_set()
1380
1381 def touch(self, digest, size):
1382 """Verifies an actual file is valid.
1383
1384 Note that is doesn't compute the hash so it could still be corrupted if the
1385 file size didn't change.
1386
1387 TODO(maruel): More stringent verification while keeping the check fast.
1388 """
1389 # Do the check outside the lock.
1390 if not is_valid_file(self._path(digest), size):
1391 return False
1392
1393 # Update it's LRU position.
1394 with self._lock:
1395 if digest not in self._lru:
1396 return False
1397 self._lru.touch(digest)
1398 return True
1399
1400 def evict(self, digest):
1401 with self._lock:
1402 self._lru.pop(digest)
1403 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1404
1405 def read(self, digest):
maruel12e30012015-10-09 11:55:35 -07001406 with fs.open(self._path(digest), 'rb') as f:
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001407 return f.read()
1408
1409 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001410 assert content is not None
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001411 path = self._path(digest)
1412 # A stale broken file may remain. It is possible for the file to have write
1413 # access bit removed which would cause the file_write() call to fail to open
1414 # in write mode. Take no chance here.
1415 file_path.try_remove(path)
1416 try:
1417 size = file_write(path, content)
1418 except:
1419 # There are two possible places were an exception can occur:
1420 # 1) Inside |content| generator in case of network or unzipping errors.
1421 # 2) Inside file_write itself in case of disk IO errors.
1422 # In any case delete an incomplete file and propagate the exception to
1423 # caller, it will be logged there.
1424 file_path.try_remove(path)
1425 raise
1426 # Make the file read-only in the cache. This has a few side-effects since
1427 # the file node is modified, so every directory entries to this file becomes
1428 # read-only. It's fine here because it is a new file.
1429 file_path.set_read_only(path, True)
1430 with self._lock:
1431 self._add(digest, size)
1432
1433 def hardlink(self, digest, dest, file_mode):
1434 """Hardlinks the file to |dest|.
1435
1436 Note that the file permission bits are on the file node, not the directory
1437 entry, so changing the access bit on any of the directory entries for the
1438 file node will affect them all.
1439 """
1440 path = self._path(digest)
maruel1f7e8162015-09-16 10:35:43 -07001441 if not file_path.link_file(dest, path, file_path.HARDLINK_WITH_FALLBACK):
1442 # Report to the server that it failed with more details. We'll want to
1443 # squash them all.
1444 on_error.report('Failed to hardlink\n%s -> %s' % (path, dest))
1445
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001446 if file_mode is not None:
1447 # Ignores all other bits.
maruel12e30012015-10-09 11:55:35 -07001448 fs.chmod(dest, file_mode & 0500)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001449
1450 def _load(self):
1451 """Loads state of the cache from json file."""
1452 self._lock.assert_locked()
1453
1454 if not os.path.isdir(self.cache_dir):
maruel12e30012015-10-09 11:55:35 -07001455 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001456 else:
1457 # Make sure the cache is read-only.
1458 # TODO(maruel): Calculate the cost and optimize the performance
1459 # accordingly.
1460 file_path.make_tree_read_only(self.cache_dir)
1461
1462 # Load state of the cache.
maruel12e30012015-10-09 11:55:35 -07001463 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001464 try:
1465 self._lru = lru.LRUDict.load(self.state_file)
1466 except ValueError as err:
1467 logging.error('Failed to load cache state: %s' % (err,))
1468 # Don't want to keep broken state file.
1469 file_path.try_remove(self.state_file)
1470
1471 # Ensure that all files listed in the state still exist and add new ones.
1472 previous = self._lru.keys_set()
1473 unknown = []
maruel12e30012015-10-09 11:55:35 -07001474 for filename in fs.listdir(self.cache_dir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001475 if filename == self.STATE_FILE:
1476 continue
1477 if filename in previous:
1478 previous.remove(filename)
1479 continue
1480 # An untracked file.
1481 if not isolated_format.is_valid_hash(filename, self.hash_algo):
1482 logging.warning('Removing unknown file %s from cache', filename)
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001483 p = self._path(filename)
maruel12e30012015-10-09 11:55:35 -07001484 if fs.isdir(p):
Marc-Antoine Ruel8cd33372015-02-09 12:54:43 -05001485 try:
1486 file_path.rmtree(p)
1487 except OSError:
1488 pass
1489 else:
1490 file_path.try_remove(p)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001491 continue
1492 # File that's not referenced in 'state.json'.
1493 # TODO(vadimsh): Verify its SHA1 matches file name.
1494 logging.warning('Adding unknown file %s to cache', filename)
1495 unknown.append(filename)
1496
1497 if unknown:
1498 # Add as oldest files. They will be deleted eventually if not accessed.
1499 self._add_oldest_list(unknown)
1500 logging.warning('Added back %d unknown files', len(unknown))
1501
1502 if previous:
1503 # Filter out entries that were not found.
1504 logging.warning('Removed %d lost files', len(previous))
1505 for filename in previous:
1506 self._lru.pop(filename)
1507 self._trim()
1508
1509 def _save(self):
1510 """Saves the LRU ordering."""
1511 self._lock.assert_locked()
1512 if sys.platform != 'win32':
1513 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001514 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001515 # Necessary otherwise the file can't be created.
1516 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001517 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001518 file_path.set_read_only(self.state_file, False)
1519 self._lru.save(self.state_file)
1520
1521 def _trim(self):
1522 """Trims anything we don't know, make sure enough free space exists."""
1523 self._lock.assert_locked()
1524
1525 # Ensure maximum cache size.
1526 if self.policies.max_cache_size:
1527 total_size = sum(self._lru.itervalues())
1528 while total_size > self.policies.max_cache_size:
1529 total_size -= self._remove_lru_file()
1530
1531 # Ensure maximum number of items in the cache.
1532 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1533 for _ in xrange(len(self._lru) - self.policies.max_items):
1534 self._remove_lru_file()
1535
1536 # Ensure enough free space.
1537 self._free_disk = file_path.get_free_space(self.cache_dir)
1538 trimmed_due_to_space = False
1539 while (
1540 self.policies.min_free_space and
1541 self._lru and
1542 self._free_disk < self.policies.min_free_space):
1543 trimmed_due_to_space = True
1544 self._remove_lru_file()
1545 self._free_disk = file_path.get_free_space(self.cache_dir)
1546 if trimmed_due_to_space:
1547 total_usage = sum(self._lru.itervalues())
1548 usage_percent = 0.
1549 if total_usage:
1550 usage_percent = 100. * self.policies.max_cache_size / float(total_usage)
1551 logging.warning(
1552 'Trimmed due to not enough free disk space: %.1fkb free, %.1fkb '
1553 'cache (%.1f%% of its maximum capacity)',
1554 self._free_disk / 1024.,
1555 total_usage / 1024.,
1556 usage_percent)
1557 self._save()
1558
1559 def _path(self, digest):
1560 """Returns the path to one item."""
1561 return os.path.join(self.cache_dir, digest)
1562
1563 def _remove_lru_file(self):
1564 """Removes the last recently used file and returns its size."""
1565 self._lock.assert_locked()
1566 digest, size = self._lru.pop_oldest()
1567 self._delete_file(digest, size)
1568 return size
1569
1570 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1571 """Adds an item into LRU cache marking it as a newest one."""
1572 self._lock.assert_locked()
1573 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001574 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001575 self._added.append(size)
1576 self._lru.add(digest, size)
1577
1578 def _add_oldest_list(self, digests):
1579 """Adds a bunch of items into LRU cache marking them as oldest ones."""
1580 self._lock.assert_locked()
1581 pairs = []
1582 for digest in digests:
maruel12e30012015-10-09 11:55:35 -07001583 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001584 self._added.append(size)
1585 pairs.append((digest, size))
1586 self._lru.batch_insert_oldest(pairs)
1587
1588 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1589 """Deletes cache file from the file system."""
1590 self._lock.assert_locked()
1591 try:
1592 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001593 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001594 file_path.try_remove(self._path(digest))
1595 self._removed.append(size)
1596 except OSError as e:
1597 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1598
1599
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001600class IsolatedBundle(object):
1601 """Fetched and parsed .isolated file with all dependencies."""
1602
Vadim Shtayura3148e072014-09-02 18:51:52 -07001603 def __init__(self):
1604 self.command = []
1605 self.files = {}
1606 self.read_only = None
1607 self.relative_cwd = None
1608 # The main .isolated file, a IsolatedFile instance.
1609 self.root = None
1610
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001611 def fetch(self, fetch_queue, root_isolated_hash, algo):
1612 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001613
1614 It enables support for "included" .isolated files. They are processed in
1615 strict order but fetched asynchronously from the cache. This is important so
1616 that a file in an included .isolated file that is overridden by an embedding
1617 .isolated file is not fetched needlessly. The includes are fetched in one
1618 pass and the files are fetched as soon as all the ones on the left-side
1619 of the tree were fetched.
1620
1621 The prioritization is very important here for nested .isolated files.
1622 'includes' have the highest priority and the algorithm is optimized for both
1623 deep and wide trees. A deep one is a long link of .isolated files referenced
1624 one at a time by one item in 'includes'. A wide one has a large number of
1625 'includes' in a single .isolated file. 'left' is defined as an included
1626 .isolated file earlier in the 'includes' list. So the order of the elements
1627 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001628
1629 As a side effect this method starts asynchronous fetch of all data files
1630 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1631 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001632 """
1633 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1634
1635 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1636 pending = {}
1637 # Set of hashes of already retrieved items to refuse recursive includes.
1638 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001639 # Set of IsolatedFile's whose data files have already being fetched.
1640 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001641
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001642 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001643 h = isolated_file.obj_hash
1644 if h in seen:
1645 raise isolated_format.IsolatedError(
1646 'IsolatedFile %s is retrieved recursively' % h)
1647 assert h not in pending
1648 seen.add(h)
1649 pending[h] = isolated_file
1650 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1651
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001652 # Start fetching root *.isolated file (single file, not the whole bundle).
1653 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001654
1655 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001656 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001657 item_hash = fetch_queue.wait(pending)
1658 item = pending.pop(item_hash)
1659 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001660
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001661 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001662 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001663 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001664
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001665 # Always fetch *.isolated files in traversal order, waiting if necessary
1666 # until next to-be-processed node loads. "Waiting" is done by yielding
1667 # back to the outer loop, that waits until some *.isolated is loaded.
1668 for node in isolated_format.walk_includes(self.root):
1669 if node not in processed:
1670 # Not visited, and not yet loaded -> wait for it to load.
1671 if not node.is_loaded:
1672 break
1673 # Not visited and loaded -> process it and continue the traversal.
1674 self._start_fetching_files(node, fetch_queue)
1675 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001676
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001677 # All *.isolated files should be processed by now and only them.
1678 all_isolateds = set(isolated_format.walk_includes(self.root))
1679 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001680
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001681 # Extract 'command' and other bundle properties.
1682 for node in isolated_format.walk_includes(self.root):
1683 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001684 self.relative_cwd = self.relative_cwd or ''
1685
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001686 def _start_fetching_files(self, isolated, fetch_queue):
1687 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001688
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001689 Modifies self.files.
1690 """
1691 logging.debug('fetch_files(%s)', isolated.obj_hash)
1692 for filepath, properties in isolated.data.get('files', {}).iteritems():
1693 # Root isolated has priority on the files being mapped. In particular,
1694 # overridden files must not be fetched.
1695 if filepath not in self.files:
1696 self.files[filepath] = properties
1697 if 'h' in properties:
1698 # Preemptively request files.
1699 logging.debug('fetching %s', filepath)
1700 fetch_queue.add(
1701 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1702
1703 def _update_self(self, node):
1704 """Extracts bundle global parameters from loaded *.isolated file.
1705
1706 Will be called with each loaded *.isolated file in order of traversal of
1707 isolated include graph (see isolated_format.walk_includes).
1708 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001709 # Grabs properties.
1710 if not self.command and node.data.get('command'):
1711 # Ensure paths are correctly separated on windows.
1712 self.command = node.data['command']
1713 if self.command:
1714 self.command[0] = self.command[0].replace('/', os.path.sep)
1715 self.command = tools.fix_python_path(self.command)
1716 if self.read_only is None and node.data.get('read_only') is not None:
1717 self.read_only = node.data['read_only']
1718 if (self.relative_cwd is None and
1719 node.data.get('relative_cwd') is not None):
1720 self.relative_cwd = node.data['relative_cwd']
1721
1722
Vadim Shtayura8623c272014-12-01 11:45:27 -08001723def set_storage_api_class(cls):
1724 """Replaces StorageApi implementation used by default."""
1725 global _storage_api_cls
1726 assert _storage_api_cls is None
1727 assert issubclass(cls, StorageApi)
1728 _storage_api_cls = cls
1729
1730
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001731def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001732 """Returns an object that implements low-level StorageApi interface.
1733
1734 It is used by Storage to work with single isolate |namespace|. It should
1735 rarely be used directly by clients, see 'get_storage' for
1736 a better alternative.
1737
1738 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001739 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001740 namespace: isolate namespace to operate in, also defines hashing and
1741 compression scheme used, i.e. namespace names that end with '-gzip'
1742 store compressed data.
1743
1744 Returns:
1745 Instance of StorageApi subclass.
1746 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001747 cls = _storage_api_cls or IsolateServer
1748 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001749
1750
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001751def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001752 """Returns Storage class that can upload and download from |namespace|.
1753
1754 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001755 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001756 namespace: isolate namespace to operate in, also defines hashing and
1757 compression scheme used, i.e. namespace names that end with '-gzip'
1758 store compressed data.
1759
1760 Returns:
1761 Instance of Storage.
1762 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001763 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001764
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001765
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001766def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001767 """Uploads the given tree to the given url.
1768
1769 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001770 base_url: The url of the isolate server to upload to.
1771 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001772 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001773 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001774 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001775 # Filter out symlinks, since they are not represented by items on isolate
1776 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001777 items = []
1778 seen = set()
1779 skipped = 0
1780 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001781 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001782 if 'l' not in metadata and filepath not in seen:
1783 seen.add(filepath)
1784 item = FileItem(
1785 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001786 digest=metadata['h'],
1787 size=metadata['s'],
1788 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001789 items.append(item)
1790 else:
1791 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001792
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001793 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001794 with get_storage(base_url, namespace) as storage:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001795 storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001796
1797
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001798def fetch_isolated(isolated_hash, storage, cache, outdir, require_command):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001799 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001800
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001801 Arguments:
1802 isolated_hash: hash of the root *.isolated file.
1803 storage: Storage class that communicates with isolate storage.
1804 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001805 outdir: Output directory to map file tree to.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001806 require_command: Ensure *.isolated specifies a command to run.
1807
1808 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001809 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001810 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001811 logging.debug(
1812 'fetch_isolated(%s, %s, %s, %s, %s)',
1813 isolated_hash, storage, cache, outdir, require_command)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001814 # Hash algorithm to use, defined by namespace |storage| is using.
1815 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 with cache:
1817 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001818 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001819
1820 with tools.Profiler('GetIsolateds'):
1821 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001822 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001823 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001824 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001825 try:
maruel1ceb3872015-10-14 06:10:44 -07001826 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001827 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001828 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001829 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1830 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001831
1832 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001833 bundle.fetch(fetch_queue, isolated_hash, algo)
1834 if require_command and not bundle.command:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001835 # TODO(vadimsh): All fetch operations are already enqueue and there's no
1836 # easy way to cancel them.
maruela72f46e2016-02-24 11:05:45 -08001837 raise IsolatedErrorNoCommand()
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001838
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001839 with tools.Profiler('GetRest'):
1840 # Create file system hierarchy.
maruel12e30012015-10-09 11:55:35 -07001841 if not fs.isdir(outdir):
1842 fs.makedirs(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001843 create_directories(outdir, bundle.files)
1844 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001845
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001846 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001847 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
maruel12e30012015-10-09 11:55:35 -07001848 if not fs.isdir(cwd):
1849 fs.makedirs(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001850
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001851 # Multimap: digest -> list of pairs (path, props).
1852 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001853 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001854 if 'h' in props:
1855 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001856
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001857 # Now block on the remaining files to be downloaded and mapped.
1858 logging.info('Retrieving remaining files (%d of them)...',
1859 fetch_queue.pending_count)
1860 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001861 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001862 while remaining:
1863 detector.ping()
1864
1865 # Wait for any item to finish fetching to cache.
1866 digest = fetch_queue.wait(remaining)
1867
1868 # Link corresponding files to a fetched item in cache.
1869 for filepath, props in remaining.pop(digest):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001870 cache.hardlink(
1871 digest, os.path.join(outdir, filepath), props.get('m'))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001872
1873 # Report progress.
1874 duration = time.time() - last_update
1875 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1876 msg = '%d files remaining...' % len(remaining)
1877 print msg
1878 logging.info(msg)
1879 last_update = time.time()
1880
1881 # Cache could evict some items we just tried to fetch, it's a fatal error.
1882 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001883 raise isolated_format.MappingError(
1884 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001885 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001886
1887
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001888def directory_to_metadata(root, algo, blacklist):
1889 """Returns the FileItem list and .isolated metadata for a directory."""
1890 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001891 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07001892 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001893 metadata = {
1894 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05001895 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001896 for relpath in paths
1897 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001898 for v in metadata.itervalues():
1899 v.pop('t')
1900 items = [
1901 FileItem(
1902 path=os.path.join(root, relpath),
1903 digest=meta['h'],
1904 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001905 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001906 for relpath, meta in metadata.iteritems() if 'h' in meta
1907 ]
1908 return items, metadata
1909
1910
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001911def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001912 """Stores every entries and returns the relevant data.
1913
1914 Arguments:
1915 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001916 files: list of file paths to upload. If a directory is specified, a
1917 .isolated file is created and its hash is returned.
1918 blacklist: function that returns True if a file should be omitted.
1919 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001920 assert all(isinstance(i, unicode) for i in files), files
1921 if len(files) != len(set(map(os.path.abspath, files))):
1922 raise Error('Duplicate entries found.')
1923
1924 results = []
1925 # The temporary directory is only created as needed.
1926 tempdir = None
1927 try:
1928 # TODO(maruel): Yield the files to a worker thread.
1929 items_to_upload = []
1930 for f in files:
1931 try:
1932 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001933 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001934 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001935 items, metadata = directory_to_metadata(
1936 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001937
1938 # Create the .isolated file.
1939 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001940 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1941 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001942 os.close(handle)
1943 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001944 'algo':
1945 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001946 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001947 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001948 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001949 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001950 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001951 items_to_upload.extend(items)
1952 items_to_upload.append(
1953 FileItem(
1954 path=isolated,
1955 digest=h,
maruel12e30012015-10-09 11:55:35 -07001956 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001957 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001958 results.append((h, f))
1959
maruel12e30012015-10-09 11:55:35 -07001960 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001961 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001962 items_to_upload.append(
1963 FileItem(
1964 path=filepath,
1965 digest=h,
maruel12e30012015-10-09 11:55:35 -07001966 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001967 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001968 results.append((h, f))
1969 else:
1970 raise Error('%s is neither a file or directory.' % f)
1971 except OSError:
1972 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001973 # Technically we would care about which files were uploaded but we don't
1974 # much in practice.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001975 _uploaded_files = storage.upload_items(items_to_upload)
1976 return results
1977 finally:
maruel12e30012015-10-09 11:55:35 -07001978 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001979 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001980
1981
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001982def archive(out, namespace, files, blacklist):
1983 if files == ['-']:
1984 files = sys.stdin.readlines()
1985
1986 if not files:
1987 raise Error('Nothing to upload')
1988
1989 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001990 blacklist = tools.gen_blacklist(blacklist)
1991 with get_storage(out, namespace) as storage:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001992 results = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001993 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1994
1995
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001996@subcommand.usage('<file1..fileN> or - to read from stdin')
1997def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001998 """Archives data to the server.
1999
2000 If a directory is specified, a .isolated file is created the whole directory
2001 is uploaded. Then this .isolated file can be included in another one to run
2002 commands.
2003
2004 The commands output each file that was processed with its content hash. For
2005 directories, the .isolated generated for the directory is listed as the
2006 directory entry itself.
2007 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002008 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002009 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002010 options, files = parser.parse_args(args)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002011 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002012 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002013 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002014 except Error as e:
2015 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002016 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002017
2018
2019def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002020 """Download data from the server.
2021
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002022 It can either download individual files or a complete tree from a .isolated
2023 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002024 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002025 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002026 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002027 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002028 help='hash of an isolated file, .isolated file content is discarded, use '
2029 '--file if you need it')
2030 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002031 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2032 help='hash and destination of a file, can be used multiple times')
2033 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002034 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002035 help='destination directory')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002036 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002037 options, args = parser.parse_args(args)
2038 if args:
2039 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002040
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002041 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002042 if bool(options.isolated) == bool(options.file):
2043 parser.error('Use one of --isolated or --file, and only one.')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002044
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002045 cache = process_cache_options(options)
maruel12e30012015-10-09 11:55:35 -07002046 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002047 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002048 if (fs.isfile(options.target) or
2049 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002050 parser.error(
2051 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002052 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002053 # Fetching individual files.
2054 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002055 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002056 channel = threading_utils.TaskChannel()
2057 pending = {}
2058 for digest, dest in options.file:
2059 pending[digest] = dest
2060 storage.async_fetch(
2061 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002062 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002063 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002064 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002065 functools.partial(file_write, os.path.join(options.target, dest)))
2066 while pending:
2067 fetched = channel.pull()
2068 dest = pending.pop(fetched)
2069 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002070
Vadim Shtayura3172be52013-12-03 12:49:05 -08002071 # Fetching whole isolated tree.
2072 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002073 with cache:
2074 bundle = fetch_isolated(
2075 isolated_hash=options.isolated,
2076 storage=storage,
2077 cache=cache,
2078 outdir=options.target,
2079 require_command=False)
2080 if bundle.command:
2081 rel = os.path.join(options.target, bundle.relative_cwd)
2082 print('To run this test please run from the directory %s:' %
2083 os.path.join(options.target, rel))
2084 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002085
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002086 return 0
2087
2088
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002089def add_archive_options(parser):
2090 parser.add_option(
2091 '--blacklist',
2092 action='append', default=list(DEFAULT_BLACKLIST),
2093 help='List of regexp to use as blacklist filter when uploading '
2094 'directories')
2095
2096
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002097def add_isolate_server_options(parser):
2098 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002099 parser.add_option(
2100 '-I', '--isolate-server',
2101 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002102 help='URL of the Isolate Server to use. Defaults to the environment '
2103 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2104 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002105 parser.add_option(
2106 '--namespace', default='default-gzip',
2107 help='The namespace to use on the Isolate Server, default: %default')
2108
2109
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002110def process_isolate_server_options(parser, options, set_exception_handler):
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002111 """Processes the --isolate-server option and aborts if not specified.
2112
2113 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002114 """
2115 if not options.isolate_server:
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002116 parser.error('--isolate-server is required.')
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002117 try:
2118 options.isolate_server = net.fix_url(options.isolate_server)
2119 except ValueError as e:
2120 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002121 if set_exception_handler:
2122 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002123 try:
2124 return auth.ensure_logged_in(options.isolate_server)
2125 except ValueError as e:
2126 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002127
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002128
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002129def add_cache_options(parser):
2130 cache_group = optparse.OptionGroup(parser, 'Cache management')
2131 cache_group.add_option(
2132 '--cache', metavar='DIR',
2133 help='Directory to keep a local cache of the files. Accelerates download '
2134 'by reusing already downloaded files. Default=%default')
2135 cache_group.add_option(
2136 '--max-cache-size',
2137 type='int',
2138 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002139 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002140 help='Trim if the cache gets larger than this value, default=%default')
2141 cache_group.add_option(
2142 '--min-free-space',
2143 type='int',
2144 metavar='NNN',
2145 default=2*1024*1024*1024,
2146 help='Trim if disk free space becomes lower than this value, '
2147 'default=%default')
2148 cache_group.add_option(
2149 '--max-items',
2150 type='int',
2151 metavar='NNN',
2152 default=100000,
2153 help='Trim if more than this number of items are in the cache '
2154 'default=%default')
2155 parser.add_option_group(cache_group)
2156
2157
2158def process_cache_options(options):
2159 if options.cache:
2160 policies = CachePolicies(
2161 options.max_cache_size, options.min_free_space, options.max_items)
2162
2163 # |options.cache| path may not exist until DiskCache() instance is created.
2164 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002165 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002166 policies,
2167 isolated_format.get_hash_algo(options.namespace))
2168 else:
2169 return MemoryCache()
2170
2171
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002172class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002173 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002174 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002175 self,
2176 version=__version__,
2177 prog=os.path.basename(sys.modules[__name__].__file__),
2178 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002179 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002180
2181 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002182 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002183 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002184 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002185 return options, args
2186
2187
2188def main(args):
2189 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002190 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002191
2192
2193if __name__ == '__main__':
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002194 fix_encoding.fix_encoding()
2195 tools.disable_buffering()
2196 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002197 sys.exit(main(sys.argv[1:]))