blob: 3d56b9688be80eb6b4afbbf8c99a880f74b7b4d4 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
maruel4409e302016-07-19 14:25:51 -07008__version__ = '0.5.1'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Cory Massarocc19c8c2015-03-10 13:35:11 -070010import base64
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +000011import functools
nodir90bc8dc2016-06-15 13:35:21 -070012import errno
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050019import tempfile
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000020import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -050022import types
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070035from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# The file size to be used when we don't know the correct file size,
48# generally used for .isolated files.
49UNKNOWN_FILE_SIZE = None
50
51
52# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000080# Chunk size to use when reading from network stream.
81NET_IO_FILE_CHUNK = 16 * 1024
82
maruel@chromium.org8750e4b2013-09-18 02:37:57 +000083
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000084# Read timeout in seconds for downloads from isolate storage. If there's no
85# response from the server within this timeout whole download will be aborted.
86DOWNLOAD_READ_TIMEOUT = 60
87
88
maruel@chromium.org41601642013-09-18 19:40:46 +000089# The delay (in seconds) to wait between logging statements when retrieving
90# the required files. This is intended to let the user (or buildbot) know that
91# the program is still running.
92DELAY_BETWEEN_UPDATES_IN_SECS = 30
93
94
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050095DEFAULT_BLACKLIST = (
96 # Temporary vim or python files.
97 r'^.+\.(?:pyc|swp)$',
98 # .git or .svn directory.
99 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
100)
101
102
Vadim Shtayura8623c272014-12-01 11:45:27 -0800103# A class to use to communicate with the server by default. Can be changed by
104# 'set_storage_api_class'. Default is IsolateServer.
105_storage_api_cls = None
106
107
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500108class Error(Exception):
109 """Generic runtime error."""
110 pass
111
112
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400113class Aborted(Error):
114 """Operation aborted."""
115 pass
116
117
nodir90bc8dc2016-06-15 13:35:21 -0700118class AlreadyExists(Error):
119 """File already exists."""
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800123 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700124 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800125 if offset:
126 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000127 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000128 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 if not data:
130 break
131 yield data
132
133
maruel12e30012015-10-09 11:55:35 -0700134def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000135 """Writes file content as generated by content_generator.
136
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 Creates the intermediary directory as needed.
138
139 Returns the number of bytes written.
140
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000141 Meant to be mocked out in unit tests.
142 """
nodire5028a92016-04-29 14:38:21 -0700143 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000144 total = 0
maruel12e30012015-10-09 11:55:35 -0700145 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000146 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000147 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000148 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000149 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000150
151
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000152def zip_compress(content_generator, level=7):
153 """Reads chunks from |content_generator| and yields zip compressed chunks."""
154 compressor = zlib.compressobj(level)
155 for chunk in content_generator:
156 compressed = compressor.compress(chunk)
157 if compressed:
158 yield compressed
159 tail = compressor.flush(zlib.Z_FINISH)
160 if tail:
161 yield tail
162
163
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400164def zip_decompress(
165 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000166 """Reads zipped data from |content_generator| and yields decompressed data.
167
168 Decompresses data in small chunks (no larger than |chunk_size|) so that
169 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
170
171 Raises IOError if data is corrupted or incomplete.
172 """
173 decompressor = zlib.decompressobj()
174 compressed_size = 0
175 try:
176 for chunk in content_generator:
177 compressed_size += len(chunk)
178 data = decompressor.decompress(chunk, chunk_size)
179 if data:
180 yield data
181 while decompressor.unconsumed_tail:
182 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
183 if data:
184 yield data
185 tail = decompressor.flush()
186 if tail:
187 yield tail
188 except zlib.error as e:
189 raise IOError(
190 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
191 # Ensure all data was read and decompressed.
192 if decompressor.unused_data or decompressor.unconsumed_tail:
193 raise IOError('Not all data was decompressed')
194
195
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000196def get_zip_compression_level(filename):
197 """Given a filename calculates the ideal zip compression level to use."""
198 file_ext = os.path.splitext(filename)[1].lower()
199 # TODO(csharp): Profile to find what compression level works best.
200 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
201
202
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000203def create_directories(base_directory, files):
204 """Creates the directory structure needed by the given list of files."""
205 logging.debug('create_directories(%s, %d)', base_directory, len(files))
206 # Creates the tree of directories to create.
207 directories = set(os.path.dirname(f) for f in files)
208 for item in list(directories):
209 while item:
210 directories.add(item)
211 item = os.path.dirname(item)
212 for d in sorted(directories):
213 if d:
maruel12e30012015-10-09 11:55:35 -0700214 fs.mkdir(os.path.join(base_directory, d))
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000215
216
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500217def create_symlinks(base_directory, files):
218 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000219 for filepath, properties in files:
220 if 'l' not in properties:
221 continue
222 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500223 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000224 logging.warning('Ignoring symlink %s', filepath)
225 continue
226 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700227 try:
228 os.symlink(properties['l'], outfile) # pylint: disable=E1101
229 except OSError as e:
230 if e.errno == errno.EEXIST:
231 raise AlreadyExists('File %s already exists.' % outfile)
232 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000233
234
maruel12e30012015-10-09 11:55:35 -0700235def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000236 """Determines if the given files appears valid.
237
238 Currently it just checks the file's size.
239 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700240 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700241 return fs.isfile(path)
242 actual_size = fs.stat(path).st_size
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000243 if size != actual_size:
244 logging.warning(
245 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700246 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000247 return False
248 return True
249
250
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000251class Item(object):
252 """An item to push to Storage.
253
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800254 Its digest and size may be provided in advance, if known. Otherwise they will
255 be derived from content(). If digest is provided, it MUST correspond to
256 hash algorithm used by Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000257
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800258 When used with Storage, Item starts its life in a main thread, travels
259 to 'contains' thread, then to 'push' thread and then finally back to
260 the main thread. It is never used concurrently from multiple threads.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """
262
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800263 def __init__(self, digest=None, size=None, high_priority=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000264 self.digest = digest
265 self.size = size
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800266 self.high_priority = high_priority
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000267 self.compression_level = 6
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000268
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800269 def content(self):
270 """Iterable with content of this item as byte string (str) chunks."""
271 raise NotImplementedError()
272
273 def prepare(self, hash_algo):
274 """Ensures self.digest and self.size are set.
275
276 Uses content() as a source of data to calculate them. Does nothing if digest
277 and size is already known.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000278
279 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800280 hash_algo: hash algorithm to use to calculate digest.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000281 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800282 if self.digest is None or self.size is None:
283 digest = hash_algo()
284 total = 0
285 for chunk in self.content():
286 digest.update(chunk)
287 total += len(chunk)
288 self.digest = digest.hexdigest()
289 self.size = total
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000290
291
292class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800293 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000294
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800295 Its digest and size may be provided in advance, if known. Otherwise they will
296 be derived from the file content.
297 """
298
299 def __init__(self, path, digest=None, size=None, high_priority=False):
300 super(FileItem, self).__init__(
301 digest,
maruel12e30012015-10-09 11:55:35 -0700302 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800303 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000304 self.path = path
305 self.compression_level = get_zip_compression_level(path)
306
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800307 def content(self):
308 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000309
310
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000311class BufferItem(Item):
312 """A byte buffer to push to Storage."""
313
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800314 def __init__(self, buf, high_priority=False):
315 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000316 self.buffer = buf
317
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800318 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000319 return [self.buffer]
320
321
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000322class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800323 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000324
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800325 Implements compression support, parallel 'contains' checks, parallel uploads
326 and more.
327
328 Works only within single namespace (and thus hashing algorithm and compression
329 scheme are fixed).
330
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400331 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
332 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800333 """
334
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700335 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000336 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400337 self._use_zip = isolated_format.is_namespace_with_compression(
338 storage_api.namespace)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400339 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000340 self._cpu_thread_pool = None
341 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400342 self._aborted = False
343 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000344
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000345 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700346 def hash_algo(self):
347 """Hashing algorithm used to name files in storage based on their content.
348
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400349 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700350 """
351 return self._hash_algo
352
353 @property
354 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500355 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700356 return self._storage_api.location
357
358 @property
359 def namespace(self):
360 """Isolate namespace used by this storage.
361
362 Indirectly defines hashing scheme and compression method used.
363 """
364 return self._storage_api.namespace
365
366 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 def cpu_thread_pool(self):
368 """ThreadPool for CPU-bound tasks like zipping."""
369 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500370 threads = max(threading_utils.num_processors(), 2)
371 if sys.maxsize <= 2L**32:
372 # On 32 bits userland, do not try to use more than 16 threads.
373 threads = min(threads, 16)
374 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000375 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000376
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000377 @property
378 def net_thread_pool(self):
379 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
380 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700381 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000382 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000383
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000384 def close(self):
385 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400386 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 if self._cpu_thread_pool:
388 self._cpu_thread_pool.join()
389 self._cpu_thread_pool.close()
390 self._cpu_thread_pool = None
391 if self._net_thread_pool:
392 self._net_thread_pool.join()
393 self._net_thread_pool.close()
394 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400395 logging.info('Done.')
396
397 def abort(self):
398 """Cancels any pending or future operations."""
399 # This is not strictly theadsafe, but in the worst case the logging message
400 # will be printed twice. Not a big deal. In other places it is assumed that
401 # unprotected reads and writes to _aborted are serializable (it is true
402 # for python) and thus no locking is used.
403 if not self._aborted:
404 logging.warning('Aborting... It can take a while.')
405 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000406
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000407 def __enter__(self):
408 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400409 assert not self._prev_sig_handlers, self._prev_sig_handlers
410 for s in (signal.SIGINT, signal.SIGTERM):
411 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 return self
413
414 def __exit__(self, _exc_type, _exc_value, _traceback):
415 """Context manager interface."""
416 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 while self._prev_sig_handlers:
418 s, h = self._prev_sig_handlers.popitem()
419 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 return False
421
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000422 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800423 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000424
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800425 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000426
427 Arguments:
428 items: list of Item instances that represents data to upload.
429
430 Returns:
431 List of items that were uploaded. All other items are already there.
432 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700433 logging.info('upload_items(items=%d)', len(items))
434
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800435 # Ensure all digests are calculated.
436 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700437 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800438
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000439 # For each digest keep only first Item that matches it. All other items
440 # are just indistinguishable copies from the point of view of isolate
441 # server (it doesn't care about paths at all, only content and digests).
442 seen = {}
443 duplicates = 0
444 for item in items:
445 if seen.setdefault(item.digest, item) is not item:
446 duplicates += 1
447 items = seen.values()
448 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700449 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000450
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000451 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000452 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000453 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800454 channel = threading_utils.TaskChannel()
455 for missing_item, push_state in self.get_missing_items(items):
456 missing.add(missing_item)
457 self.async_push(channel, missing_item, push_state)
458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 # No need to spawn deadlock detector thread if there's nothing to upload.
460 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700461 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000463 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000464 detector.ping()
465 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000466 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000467 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000469 logging.info('All files are uploaded')
470
471 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000472 total = len(items)
473 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000474 logging.info(
475 'Total: %6d, %9.1fkb',
476 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000477 total_size / 1024.)
478 cache_hit = set(items) - missing
479 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000480 logging.info(
481 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
482 len(cache_hit),
483 cache_hit_size / 1024.,
484 len(cache_hit) * 100. / total,
485 cache_hit_size * 100. / total_size if total_size else 0)
486 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000487 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000488 logging.info(
489 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
490 len(cache_miss),
491 cache_miss_size / 1024.,
492 len(cache_miss) * 100. / total,
493 cache_miss_size * 100. / total_size if total_size else 0)
494
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000495 return uploaded
496
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800497 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000498 """Starts asynchronous push to the server in a parallel thread.
499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 Can be used only after |item| was checked for presence on a server with
501 'get_missing_items' call. 'get_missing_items' returns |push_state| object
502 that contains storage specific information describing how to upload
503 the item (for example in case of cloud storage, it is signed upload URLs).
504
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000505 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000506 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000507 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800508 push_state: push state returned by 'get_missing_items' call for |item|.
509
510 Returns:
511 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000512 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400514 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700515 threading_utils.PRIORITY_HIGH if item.high_priority
516 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800517
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000518 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400519 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400520 if self._aborted:
521 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700522 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800523 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000524 return item
525
526 # If zipping is not required, just start a push task.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700527 if not self._use_zip:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800528 self.net_thread_pool.add_task_with_channel(
529 channel, priority, push, item.content())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000530 return
531
532 # If zipping is enabled, zip in a separate thread.
533 def zip_and_push():
534 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
535 # content right here. It will block until all file is zipped.
536 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400537 if self._aborted:
538 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 data = ''.join(stream)
541 except Exception as exc:
542 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800543 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 return
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000545 self.net_thread_pool.add_task_with_channel(
546 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000548
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800549 def push(self, item, push_state):
550 """Synchronously pushes a single item to the server.
551
552 If you need to push many items at once, consider using 'upload_items' or
553 'async_push' with instance of TaskChannel.
554
555 Arguments:
556 item: item to upload as instance of Item class.
557 push_state: push state returned by 'get_missing_items' call for |item|.
558
559 Returns:
560 Pushed item (same object as |item|).
561 """
562 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700563 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800564 self.async_push(channel, item, push_state)
565 pushed = channel.pull()
566 assert pushed is item
567 return item
568
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000569 def async_fetch(self, channel, priority, digest, size, sink):
570 """Starts asynchronous fetch from the server in a parallel thread.
571
572 Arguments:
573 channel: TaskChannel that receives back |digest| when download ends.
574 priority: thread pool task priority for the fetch.
575 digest: hex digest of an item to download.
576 size: expected size of the item (after decompression).
577 sink: function that will be called as sink(generator).
578 """
579 def fetch():
580 try:
581 # Prepare reading pipeline.
582 stream = self._storage_api.fetch(digest)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700583 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400584 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000585 # Run |stream| through verifier that will assert its size.
586 verifier = FetchStreamVerifier(stream, size)
587 # Verified stream goes to |sink|.
588 sink(verifier.run())
589 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800590 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000591 raise
592 return digest
593
594 # Don't bother with zip_thread_pool for decompression. Decompression is
595 # really fast and most probably IO bound anyway.
596 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
597
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000598 def get_missing_items(self, items):
599 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000600
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000601 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000602
603 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000604 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000605
606 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800607 For each missing item it yields a pair (item, push_state), where:
608 * item - Item object that is missing (one of |items|).
609 * push_state - opaque object that contains storage specific information
610 describing how to upload the item (for example in case of cloud
611 storage, it is signed upload URLs). It can later be passed to
612 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000613 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000614 channel = threading_utils.TaskChannel()
615 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616
617 # Ensure all digests are calculated.
618 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700619 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800620
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400621 def contains(batch):
622 if self._aborted:
623 raise Aborted()
624 return self._storage_api.contains(batch)
625
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000626 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400628 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400629 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000630 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000632 # Yield results as they come in.
633 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634 for missing_item, push_state in channel.pull().iteritems():
635 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000636
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000637
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800638def batch_items_for_check(items):
639 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000640
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800641 Each batch corresponds to a single 'exists?' query to the server via a call
642 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000643
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800644 Arguments:
645 items: a list of Item objects.
646
647 Yields:
648 Batches of items to query for existence in a single operation,
649 each batch is a list of Item objects.
650 """
651 batch_count = 0
652 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
653 next_queries = []
654 for item in sorted(items, key=lambda x: x.size, reverse=True):
655 next_queries.append(item)
656 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000657 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800658 next_queries = []
659 batch_count += 1
660 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
661 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
662 if next_queries:
663 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000664
665
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000666class FetchQueue(object):
667 """Fetches items from Storage and places them into LocalCache.
668
669 It manages multiple concurrent fetch operations. Acts as a bridge between
670 Storage and LocalCache so that Storage and LocalCache don't depend on each
671 other at all.
672 """
673
674 def __init__(self, storage, cache):
675 self.storage = storage
676 self.cache = cache
677 self._channel = threading_utils.TaskChannel()
678 self._pending = set()
679 self._accessed = set()
680 self._fetched = cache.cached_set()
681
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400682 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700683 self,
684 digest,
685 size=UNKNOWN_FILE_SIZE,
686 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000687 """Starts asynchronous fetch of item |digest|."""
688 # Fetching it now?
689 if digest in self._pending:
690 return
691
692 # Mark this file as in use, verify_all_cached will later ensure it is still
693 # in cache.
694 self._accessed.add(digest)
695
696 # Already fetched? Notify cache to update item's LRU position.
697 if digest in self._fetched:
698 # 'touch' returns True if item is in cache and not corrupted.
699 if self.cache.touch(digest, size):
700 return
701 # Item is corrupted, remove it from cache and fetch it again.
702 self._fetched.remove(digest)
703 self.cache.evict(digest)
704
705 # TODO(maruel): It should look at the free disk space, the current cache
706 # size and the size of the new item on every new item:
707 # - Trim the cache as more entries are listed when free disk space is low,
708 # otherwise if the amount of data downloaded during the run > free disk
709 # space, it'll crash.
710 # - Make sure there's enough free disk space to fit all dependencies of
711 # this run! If not, abort early.
712
713 # Start fetching.
714 self._pending.add(digest)
715 self.storage.async_fetch(
716 self._channel, priority, digest, size,
717 functools.partial(self.cache.write, digest))
718
719 def wait(self, digests):
720 """Starts a loop that waits for at least one of |digests| to be retrieved.
721
722 Returns the first digest retrieved.
723 """
724 # Flush any already fetched items.
725 for digest in digests:
726 if digest in self._fetched:
727 return digest
728
729 # Ensure all requested items are being fetched now.
730 assert all(digest in self._pending for digest in digests), (
731 digests, self._pending)
732
733 # Wait for some requested item to finish fetching.
734 while self._pending:
735 digest = self._channel.pull()
736 self._pending.remove(digest)
737 self._fetched.add(digest)
738 if digest in digests:
739 return digest
740
741 # Should never reach this point due to assert above.
742 raise RuntimeError('Impossible state')
743
744 def inject_local_file(self, path, algo):
745 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700746 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000747 data = f.read()
748 digest = algo(data).hexdigest()
749 self.cache.write(digest, [data])
750 self._fetched.add(digest)
751 return digest
752
753 @property
754 def pending_count(self):
755 """Returns number of items to be fetched."""
756 return len(self._pending)
757
758 def verify_all_cached(self):
759 """True if all accessed items are in cache."""
760 return self._accessed.issubset(self.cache.cached_set())
761
762
763class FetchStreamVerifier(object):
764 """Verifies that fetched file is valid before passing it to the LocalCache."""
765
766 def __init__(self, stream, expected_size):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400767 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000768 self.stream = stream
769 self.expected_size = expected_size
770 self.current_size = 0
771
772 def run(self):
773 """Generator that yields same items as |stream|.
774
775 Verifies |stream| is complete before yielding a last chunk to consumer.
776
777 Also wraps IOError produced by consumer into MappingError exceptions since
778 otherwise Storage will retry fetch on unrelated local cache errors.
779 """
780 # Read one chunk ahead, keep it in |stored|.
781 # That way a complete stream can be verified before pushing last chunk
782 # to consumer.
783 stored = None
784 for chunk in self.stream:
785 assert chunk is not None
786 if stored is not None:
787 self._inspect_chunk(stored, is_last=False)
788 try:
789 yield stored
790 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400791 raise isolated_format.MappingError(
792 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000793 stored = chunk
794 if stored is not None:
795 self._inspect_chunk(stored, is_last=True)
796 try:
797 yield stored
798 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400799 raise isolated_format.MappingError(
800 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000801
802 def _inspect_chunk(self, chunk, is_last):
803 """Called for each fetched chunk before passing it to consumer."""
804 self.current_size += len(chunk)
Marc-Antoine Ruel1e7658c2014-08-28 19:46:39 -0400805 if (is_last and
Vadim Shtayura3148e072014-09-02 18:51:52 -0700806 (self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000807 (self.expected_size != self.current_size)):
808 raise IOError('Incorrect file size: expected %d, got %d' % (
809 self.expected_size, self.current_size))
810
811
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000812class StorageApi(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800813 """Interface for classes that implement low-level storage operations.
814
815 StorageApi is oblivious of compression and hashing scheme used. This details
816 are handled in higher level Storage class.
817
818 Clients should generally not use StorageApi directly. Storage class is
819 preferred since it implements compression and upload optimizations.
820 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000821
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700822 @property
823 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500824 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700825 raise NotImplementedError()
826
827 @property
828 def namespace(self):
829 """Isolate namespace used by this storage.
830
831 Indirectly defines hashing scheme and compression method used.
832 """
833 raise NotImplementedError()
834
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800835 def fetch(self, digest, offset=0):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000836 """Fetches an object and yields its content.
837
838 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000839 digest: hash digest of item to download.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800840 offset: offset (in bytes) from the start of the file to resume fetch from.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000841
842 Yields:
843 Chunks of downloaded item (as str objects).
844 """
845 raise NotImplementedError()
846
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800847 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000848 """Uploads an |item| with content generated by |content| generator.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000849
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800850 |item| MUST go through 'contains' call to get |push_state| before it can
851 be pushed to the storage.
852
853 To be clear, here is one possible usage:
854 all_items = [... all items to push as Item subclasses ...]
855 for missing_item, push_state in storage_api.contains(all_items).items():
856 storage_api.push(missing_item, push_state)
857
858 When pushing to a namespace with compression, data that should be pushed
859 and data provided by the item is not the same. In that case |content| is
860 not None and it yields chunks of compressed data (using item.content() as
861 a source of original uncompressed data). This is implemented by Storage
862 class.
863
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000864 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000865 item: Item object that holds information about an item being pushed.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800866 push_state: push state object as returned by 'contains' call.
867 content: a generator that yields chunks to push, item.content() if None.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000868
869 Returns:
870 None.
871 """
872 raise NotImplementedError()
873
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000874 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875 """Checks for |items| on the server, prepares missing ones for upload.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000876
877 Arguments:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 items: list of Item objects to check for presence.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000879
880 Returns:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800881 A dict missing Item -> opaque push state object to be passed to 'push'.
882 See doc string for 'push'.
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000883 """
884 raise NotImplementedError()
885
886
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800887class _IsolateServerPushState(object):
888 """Per-item state passed from IsolateServer.contains to IsolateServer.push.
Mike Frysinger27f03da2014-02-12 16:47:01 -0500889
890 Note this needs to be a global class to support pickling.
891 """
892
Cory Massarocc19c8c2015-03-10 13:35:11 -0700893 def __init__(self, preupload_status, size):
894 self.preupload_status = preupload_status
895 gs_upload_url = preupload_status.get('gs_upload_url') or None
896 if gs_upload_url:
897 self.upload_url = gs_upload_url
898 self.finalize_url = '_ah/api/isolateservice/v1/finalize_gs_upload'
899 else:
900 self.upload_url = '_ah/api/isolateservice/v1/store_inline'
901 self.finalize_url = None
Mike Frysinger27f03da2014-02-12 16:47:01 -0500902 self.uploaded = False
903 self.finalized = False
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500904 self.size = size
Mike Frysinger27f03da2014-02-12 16:47:01 -0500905
906
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000907class IsolateServer(StorageApi):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000908 """StorageApi implementation that downloads and uploads to Isolate Server.
909
910 It uploads and downloads directly from Google Storage whenever appropriate.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800911 Works only within single namespace.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000912 """
913
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000914 def __init__(self, base_url, namespace):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000915 super(IsolateServer, self).__init__()
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500916 assert file_path.is_url(base_url), base_url
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700917 self._base_url = base_url.rstrip('/')
918 self._namespace = namespace
Cory Massarocc19c8c2015-03-10 13:35:11 -0700919 self._namespace_dict = {
920 'compression': 'flate' if namespace.endswith(
921 ('-gzip', '-flate')) else '',
922 'digest_hash': 'sha-1',
923 'namespace': namespace,
924 }
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000925 self._lock = threading.Lock()
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000926 self._server_caps = None
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -0500927 self._memory_use = 0
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000928
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000929 @property
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000930 def _server_capabilities(self):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700931 """Gets server details.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000932
933 Returns:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700934 Server capabilities dictionary as returned by /server_details endpoint.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000935 """
maruel@chromium.org3e42ce82013-09-12 18:36:59 +0000936 # TODO(maruel): Make this request much earlier asynchronously while the
937 # files are being enumerated.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800938
939 # TODO(vadimsh): Put |namespace| in the URL so that server can apply
940 # namespace-level ACLs to this call.
Cory Massarocc19c8c2015-03-10 13:35:11 -0700941
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000942 with self._lock:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000943 if self._server_caps is None:
Cory Massarocc19c8c2015-03-10 13:35:11 -0700944 self._server_caps = net.url_read_json(
945 url='%s/_ah/api/isolateservice/v1/server_details' % self._base_url,
946 data={})
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000947 return self._server_caps
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000948
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700949 @property
950 def location(self):
951 return self._base_url
952
953 @property
954 def namespace(self):
955 return self._namespace
956
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800957 def fetch(self, digest, offset=0):
Cory Massarocc19c8c2015-03-10 13:35:11 -0700958 assert offset >= 0
959 source_url = '%s/_ah/api/isolateservice/v1/retrieve' % (
960 self._base_url)
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800961 logging.debug('download_file(%s, %d)', source_url, offset)
Cory Massarocc19c8c2015-03-10 13:35:11 -0700962 response = self.do_fetch(source_url, digest, offset)
maruel@chromium.orge45728d2013-09-16 23:23:22 +0000963
Cory Massarocc19c8c2015-03-10 13:35:11 -0700964 if not response:
maruele154f9c2015-09-14 11:03:15 -0700965 raise IOError(
966 'Attempted to fetch from %s; no data exist: %s / %s.' % (
967 source_url, self._namespace, digest))
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800968
Cory Massarocc19c8c2015-03-10 13:35:11 -0700969 # for DB uploads
970 content = response.get('content')
971 if content is not None:
maruel863ac262016-03-17 11:00:37 -0700972 yield base64.b64decode(content)
973 return
Cory Massarocc19c8c2015-03-10 13:35:11 -0700974
975 # for GS entities
976 connection = net.url_open(response['url'])
maruelf5574752015-09-17 13:40:27 -0700977 if not connection:
978 raise IOError('Failed to download %s / %s' % (self._namespace, digest))
Cory Massarocc19c8c2015-03-10 13:35:11 -0700979
980 # If |offset|, verify server respects it by checking Content-Range.
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800981 if offset:
982 content_range = connection.get_header('Content-Range')
983 if not content_range:
984 raise IOError('Missing Content-Range header')
985
986 # 'Content-Range' format is 'bytes <offset>-<last_byte_index>/<size>'.
987 # According to a spec, <size> can be '*' meaning "Total size of the file
988 # is not known in advance".
989 try:
990 match = re.match(r'bytes (\d+)-(\d+)/(\d+|\*)', content_range)
991 if not match:
992 raise ValueError()
993 content_offset = int(match.group(1))
994 last_byte_index = int(match.group(2))
995 size = None if match.group(3) == '*' else int(match.group(3))
996 except ValueError:
997 raise IOError('Invalid Content-Range header: %s' % content_range)
998
999 # Ensure returned offset equals requested one.
1000 if offset != content_offset:
1001 raise IOError('Expecting offset %d, got %d (Content-Range is %s)' % (
1002 offset, content_offset, content_range))
1003
1004 # Ensure entire tail of the file is returned.
1005 if size is not None and last_byte_index + 1 != size:
1006 raise IOError('Incomplete response. Content-Range: %s' % content_range)
1007
maruel863ac262016-03-17 11:00:37 -07001008 for data in connection.iter_content(NET_IO_FILE_CHUNK):
1009 yield data
maruel@chromium.orge45728d2013-09-16 23:23:22 +00001010
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001011 def push(self, item, push_state, content=None):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001012 assert isinstance(item, Item)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001013 assert item.digest is not None
1014 assert item.size is not None
1015 assert isinstance(push_state, _IsolateServerPushState)
1016 assert not push_state.finalized
1017
1018 # Default to item.content().
1019 content = item.content() if content is None else content
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001020 logging.info('Push state size: %d', push_state.size)
1021 if isinstance(content, (basestring, list)):
1022 # Memory is already used, too late.
1023 with self._lock:
1024 self._memory_use += push_state.size
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001025 else:
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001026 # TODO(vadimsh): Do not read from |content| generator when retrying push.
1027 # If |content| is indeed a generator, it can not be re-winded back to the
1028 # beginning of the stream. A retry will find it exhausted. A possible
1029 # solution is to wrap |content| generator with some sort of caching
1030 # restartable generator. It should be done alongside streaming support
1031 # implementation.
1032 #
1033 # In theory, we should keep the generator, so that it is not serialized in
1034 # memory. Sadly net.HttpService.request() requires the body to be
1035 # serialized.
1036 assert isinstance(content, types.GeneratorType), repr(content)
1037 slept = False
1038 # HACK HACK HACK. Please forgive me for my sins but OMG, it works!
Marc-Antoine Ruele6677c82015-02-05 14:54:22 -05001039 # One byte less than 512mb. This is to cope with incompressible content.
1040 max_size = int(sys.maxsize * 0.25)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001041 while True:
1042 with self._lock:
1043 # This is due to 32 bits python when uploading very large files. The
1044 # problem is that it's comparing uncompressed sizes, while we care
1045 # about compressed sizes since it's what is serialized in memory.
1046 # The first check assumes large files are compressible and that by
1047 # throttling one upload at once, we can survive. Otherwise, kaboom.
1048 memory_use = self._memory_use
1049 if ((push_state.size >= max_size and not memory_use) or
1050 (memory_use + push_state.size <= max_size)):
1051 self._memory_use += push_state.size
1052 memory_use = self._memory_use
1053 break
1054 time.sleep(0.1)
1055 slept = True
1056 if slept:
1057 logging.info('Unblocked: %d %d', memory_use, push_state.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +00001058
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001059 try:
1060 # This push operation may be a retry after failed finalization call below,
1061 # no need to reupload contents in that case.
1062 if not push_state.uploaded:
1063 # PUT file to |upload_url|.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001064 success = self.do_push(push_state, content)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001065 if not success:
Cory Massarocc19c8c2015-03-10 13:35:11 -07001066 raise IOError('Failed to upload file with hash %s to URL %s' % (
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001067 item.digest, push_state.upload_url))
1068 push_state.uploaded = True
1069 else:
1070 logging.info(
1071 'A file %s already uploaded, retrying finalization only',
1072 item.digest)
1073
1074 # Optionally notify the server that it's done.
1075 if push_state.finalize_url:
1076 # TODO(vadimsh): Calculate MD5 or CRC32C sum while uploading a file and
1077 # send it to isolated server. That way isolate server can verify that
1078 # the data safely reached Google Storage (GS provides MD5 and CRC32C of
1079 # stored files).
1080 # TODO(maruel): Fix the server to accept properly data={} so
1081 # url_read_json() can be used.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001082 response = net.url_read_json(
1083 url='%s/%s' % (self._base_url, push_state.finalize_url),
1084 data={
1085 'upload_ticket': push_state.preupload_status['upload_ticket'],
1086 })
1087 if not response or not response['ok']:
1088 raise IOError('Failed to finalize file with hash %s.' % item.digest)
Marc-Antoine Ruele98dde92015-01-22 14:53:05 -05001089 push_state.finalized = True
1090 finally:
1091 with self._lock:
1092 self._memory_use -= push_state.size
maruel@chromium.orgd1e20c92013-09-17 20:54:26 +00001093
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001094 def contains(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001095 # Ensure all items were initialized with 'prepare' call. Storage does that.
1096 assert all(i.digest is not None and i.size is not None for i in items)
1097
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001098 # Request body is a json encoded list of dicts.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001099 body = {
1100 'items': [
1101 {
1102 'digest': item.digest,
1103 'is_isolated': bool(item.high_priority),
1104 'size': item.size,
1105 } for item in items
1106 ],
1107 'namespace': self._namespace_dict,
1108 }
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001109
Cory Massarocc19c8c2015-03-10 13:35:11 -07001110 query_url = '%s/_ah/api/isolateservice/v1/preupload' % self._base_url
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001111
1112 # Response body is a list of push_urls (or null if file is already present).
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001113 response = None
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001114 try:
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001115 response = net.url_read_json(url=query_url, data=body)
1116 if response is None:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001117 raise isolated_format.MappingError(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001118 'Failed to execute preupload query')
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001119 except ValueError as err:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001120 raise isolated_format.MappingError(
Marc-Antoine Ruel0a620612014-08-13 15:47:07 -04001121 'Invalid response from server: %s, body is %s' % (err, response))
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001122
1123 # Pick Items that are missing, attach _PushState to them.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001124 missing_items = {}
Cory Massarocc19c8c2015-03-10 13:35:11 -07001125 for preupload_status in response.get('items', []):
1126 assert 'upload_ticket' in preupload_status, (
1127 preupload_status, '/preupload did not generate an upload ticket')
1128 index = int(preupload_status['index'])
1129 missing_items[items[index]] = _IsolateServerPushState(
1130 preupload_status, items[index].size)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +00001131 logging.info('Queried %d files, %d cache hit',
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +00001132 len(items), len(items) - len(missing_items))
1133 return missing_items
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001134
Cory Massarocc19c8c2015-03-10 13:35:11 -07001135 def do_fetch(self, url, digest, offset):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001136 """Fetches isolated data from the URL.
1137
1138 Used only for fetching files, not for API calls. Can be overridden in
1139 subclasses.
1140
1141 Args:
1142 url: URL to fetch the data from, can possibly return http redirect.
1143 offset: byte offset inside the file to start fetching from.
1144
1145 Returns:
1146 net.HttpResponse compatible object, with 'read' and 'get_header' calls.
1147 """
Cory Massarocc19c8c2015-03-10 13:35:11 -07001148 assert isinstance(offset, int)
1149 data = {
1150 'digest': digest.encode('utf-8'),
1151 'namespace': self._namespace_dict,
1152 'offset': offset,
1153 }
maruel0c25f4f2015-12-15 05:41:17 -08001154 # TODO(maruel): url + '?' + urllib.urlencode(data) once a HTTP GET endpoint
1155 # is added.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001156 return net.url_read_json(
1157 url=url,
1158 data=data,
1159 read_timeout=DOWNLOAD_READ_TIMEOUT)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001160
Cory Massarocc19c8c2015-03-10 13:35:11 -07001161 def do_push(self, push_state, content):
Vadim Shtayura8623c272014-12-01 11:45:27 -08001162 """Uploads isolated file to the URL.
1163
1164 Used only for storing files, not for API calls. Can be overridden in
1165 subclasses.
1166
1167 Args:
1168 url: URL to upload the data to.
Cory Massarocc19c8c2015-03-10 13:35:11 -07001169 push_state: an _IsolateServicePushState instance
1170 item: the original Item to be uploaded
Vadim Shtayura8623c272014-12-01 11:45:27 -08001171 content: an iterable that yields 'str' chunks.
Vadim Shtayura8623c272014-12-01 11:45:27 -08001172 """
1173 # A cheezy way to avoid memcpy of (possibly huge) file, until streaming
1174 # upload support is implemented.
1175 if isinstance(content, list) and len(content) == 1:
1176 content = content[0]
1177 else:
1178 content = ''.join(content)
Cory Massarocc19c8c2015-03-10 13:35:11 -07001179
1180 # DB upload
1181 if not push_state.finalize_url:
1182 url = '%s/%s' % (self._base_url, push_state.upload_url)
1183 content = base64.b64encode(content)
1184 data = {
1185 'upload_ticket': push_state.preupload_status['upload_ticket'],
1186 'content': content,
1187 }
1188 response = net.url_read_json(url=url, data=data)
1189 return response is not None and response['ok']
1190
1191 # upload to GS
1192 url = push_state.upload_url
Vadim Shtayura8623c272014-12-01 11:45:27 -08001193 response = net.url_read(
Cory Massarocc19c8c2015-03-10 13:35:11 -07001194 content_type='application/octet-stream',
1195 data=content,
1196 method='PUT',
tandriib44d54d2016-02-10 11:31:41 -08001197 headers={'Cache-Control': 'public, max-age=31536000'},
Cory Massarocc19c8c2015-03-10 13:35:11 -07001198 url=url)
Vadim Shtayura8623c272014-12-01 11:45:27 -08001199 return response is not None
1200
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001201
nodir445097b2016-06-03 22:50:26 -07001202class CacheMiss(Exception):
1203 """Raised when an item is not in cache."""
1204
1205 def __init__(self, digest):
1206 self.digest = digest
1207 super(CacheMiss, self).__init__(
1208 'Item with digest %r is not found in cache' % digest)
1209
1210
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001211class LocalCache(object):
1212 """Local cache that stores objects fetched via Storage.
1213
1214 It can be accessed concurrently from multiple threads, so it should protect
1215 its internal state with some lock.
1216 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001217 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001218
maruel064c0a32016-04-05 11:47:15 -07001219 def __init__(self):
1220 self._lock = threading_utils.LockWithAssert()
1221 # Profiling values.
1222 self._added = []
1223 self._initial_number_items = 0
1224 self._initial_size = 0
1225 self._evicted = []
1226 self._linked = []
1227
nodirbe642ff2016-06-09 15:51:51 -07001228 def __contains__(self, digest):
1229 raise NotImplementedError()
1230
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001231 def __enter__(self):
1232 """Context manager interface."""
1233 return self
1234
1235 def __exit__(self, _exc_type, _exec_value, _traceback):
1236 """Context manager interface."""
1237 return False
1238
maruel064c0a32016-04-05 11:47:15 -07001239 @property
1240 def added(self):
1241 return self._added[:]
1242
1243 @property
1244 def evicted(self):
1245 return self._evicted[:]
1246
1247 @property
1248 def initial_number_items(self):
1249 return self._initial_number_items
1250
1251 @property
1252 def initial_size(self):
1253 return self._initial_size
1254
1255 @property
1256 def linked(self):
1257 return self._linked[:]
1258
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001259 def cached_set(self):
1260 """Returns a set of all cached digests (always a new object)."""
1261 raise NotImplementedError()
1262
maruel36a963d2016-04-08 17:15:49 -07001263 def cleanup(self):
1264 """Deletes any corrupted item from the cache and trims it if necessary."""
1265 raise NotImplementedError()
1266
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001267 def touch(self, digest, size):
1268 """Ensures item is not corrupted and updates its LRU position.
1269
1270 Arguments:
1271 digest: hash digest of item to check.
1272 size: expected size of this item.
1273
1274 Returns:
1275 True if item is in cache and not corrupted.
1276 """
1277 raise NotImplementedError()
1278
1279 def evict(self, digest):
1280 """Removes item from cache if it's there."""
1281 raise NotImplementedError()
1282
1283 def read(self, digest):
1284 """Returns contents of the cached item as a single str."""
1285 raise NotImplementedError()
1286
1287 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001288 """Reads data from |content| generator and stores it in cache.
1289
1290 Returns digest to simplify chaining.
1291 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001292 raise NotImplementedError()
1293
maruel4409e302016-07-19 14:25:51 -07001294 def link(self, digest, dest, file_mode, use_symlink):
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001295 """Ensures file at |dest| has same content as cached |digest|.
1296
1297 If file_mode is provided, it is used to set the executable bit if
1298 applicable.
maruel4409e302016-07-19 14:25:51 -07001299
1300 The function may copy the content, create a hardlink and if use_symlink is
1301 True, create a symlink if possible.
1302
1303 Creating a tree of hardlinks has a few drawbacks:
1304 - tmpfs cannot be used for the scratch space. The tree has to be on the same
1305 partition as the cache.
1306 - involves a write to the inode, which advances ctime, cause a metadata
1307 writeback (causing disk seeking).
1308 - cache ctime cannot be used to detect modifications / corruption.
1309 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
1310 partition. This is why the function automatically fallbacks to copying the
1311 file content.
1312 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
1313 same owner is for all hardlinks.
1314 - Anecdotal report that ext2 is known to be potentially faulty on high rate
1315 of hardlink creation.
1316
1317 Creating a tree of symlinks has a few drawbacks:
1318 - Tasks running the equivalent of os.path.realpath() will get the naked path
1319 and may fail.
1320 - Windows:
1321 - Symlinks are reparse points:
1322 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
1323 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
1324 - Symbolic links are Win32 paths, not NT paths.
1325 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
1326 - Symbolic links are supported on Windows 7 and later only.
1327 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
1328 default.
1329 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
1330 RID is present in the token;
1331 https://msdn.microsoft.com/en-us/library/bb530410.aspx
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001332 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001333 raise NotImplementedError()
1334
1335
1336class MemoryCache(LocalCache):
1337 """LocalCache implementation that stores everything in memory."""
1338
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001339 def __init__(self, file_mode_mask=0500):
1340 """Args:
1341 file_mode_mask: bit mask to AND file mode with. Default value will make
1342 all mapped files to be read only.
1343 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001344 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001345 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001346 self._contents = {}
1347
nodirbe642ff2016-06-09 15:51:51 -07001348 def __contains__(self, digest):
1349 with self._lock:
1350 return digest in self._contents
1351
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001352 def cached_set(self):
1353 with self._lock:
1354 return set(self._contents)
1355
maruel36a963d2016-04-08 17:15:49 -07001356 def cleanup(self):
1357 pass
1358
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001359 def touch(self, digest, size):
1360 with self._lock:
1361 return digest in self._contents
1362
1363 def evict(self, digest):
1364 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001365 v = self._contents.pop(digest, None)
1366 if v is not None:
1367 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001368
1369 def read(self, digest):
1370 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001371 try:
1372 return self._contents[digest]
1373 except KeyError:
1374 raise CacheMiss(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001375
1376 def write(self, digest, content):
1377 # Assemble whole stream before taking the lock.
1378 data = ''.join(content)
1379 with self._lock:
1380 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001381 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001382 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001383
maruel4409e302016-07-19 14:25:51 -07001384 def link(self, digest, dest, file_mode, use_symlink):
1385 """Since data is kept in memory, there is no filenode to hardlink/symlink.
1386 """
maruel064c0a32016-04-05 11:47:15 -07001387 data = self.read(digest)
1388 file_write(dest, [data])
Marc-Antoine Ruelfb199cf2013-11-12 15:38:12 -05001389 if file_mode is not None:
maruel12e30012015-10-09 11:55:35 -07001390 fs.chmod(dest, file_mode & self._file_mode_mask)
maruel064c0a32016-04-05 11:47:15 -07001391 with self._lock:
1392 self._linked.append(len(data))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001393
1394
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001395class CachePolicies(object):
1396 def __init__(self, max_cache_size, min_free_space, max_items):
1397 """
1398 Arguments:
1399 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1400 cache is effectively a leak.
1401 - min_free_space: Trim if disk free space becomes lower than this value. If
1402 0, it unconditionally fill the disk.
1403 - max_items: Maximum number of items to keep in the cache. If 0, do not
1404 enforce a limit.
1405 """
1406 self.max_cache_size = max_cache_size
1407 self.min_free_space = min_free_space
1408 self.max_items = max_items
1409
1410
1411class DiskCache(LocalCache):
1412 """Stateful LRU cache in a flat hash table in a directory.
1413
1414 Saves its state as json file.
1415 """
maruel12e30012015-10-09 11:55:35 -07001416 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001417
1418 def __init__(self, cache_dir, policies, hash_algo):
1419 """
1420 Arguments:
1421 cache_dir: directory where to place the cache.
1422 policies: cache retention policies.
1423 algo: hashing algorithm used.
1424 """
maruel064c0a32016-04-05 11:47:15 -07001425 # All protected methods (starting with '_') except _path should be called
1426 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001427 super(DiskCache, self).__init__()
1428 self.cache_dir = cache_dir
1429 self.policies = policies
1430 self.hash_algo = hash_algo
1431 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001432 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001433 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001434 # Current cached free disk space. It is updated by self._trim().
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001435 self._free_disk = 0
maruel2e8d0f52016-07-16 07:51:29 -07001436 # The first item in the LRU cache that must not be evicted during this run
1437 # since it was referenced. All items more recent that _protected in the LRU
1438 # cache are also inherently protected. It could be a set() of all items
1439 # referenced but this increases memory usage without a use case.
1440 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001441 # Cleanup operations done by self._load(), if any.
1442 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001443 with tools.Profiler('Setup'):
1444 with self._lock:
maruel083fa552016-04-08 14:38:01 -07001445 # self._load() calls self._trim() which initializes self._free_disk.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001446 self._load()
1447
nodirbe642ff2016-06-09 15:51:51 -07001448 def __contains__(self, digest):
1449 with self._lock:
1450 return digest in self._lru
1451
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001452 def __enter__(self):
1453 return self
1454
1455 def __exit__(self, _exc_type, _exec_value, _traceback):
1456 with tools.Profiler('CleanupTrimming'):
1457 with self._lock:
1458 self._trim()
1459
1460 logging.info(
1461 '%5d (%8dkb) added',
1462 len(self._added), sum(self._added) / 1024)
1463 logging.info(
1464 '%5d (%8dkb) current',
1465 len(self._lru),
1466 sum(self._lru.itervalues()) / 1024)
1467 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001468 '%5d (%8dkb) evicted',
1469 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001470 logging.info(
1471 ' %8dkb free',
1472 self._free_disk / 1024)
1473 return False
1474
1475 def cached_set(self):
1476 with self._lock:
1477 return self._lru.keys_set()
1478
maruel36a963d2016-04-08 17:15:49 -07001479 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001480 """Cleans up the cache directory.
1481
1482 Ensures there is no unknown files in cache_dir.
1483 Ensures the read-only bits are set correctly.
1484
1485 At that point, the cache was already loaded, trimmed to respect cache
1486 policies.
1487 """
1488 fs.chmod(self.cache_dir, 0700)
1489 # Ensure that all files listed in the state still exist and add new ones.
1490 previous = self._lru.keys_set()
1491 # It'd be faster if there were a readdir() function.
1492 for filename in fs.listdir(self.cache_dir):
1493 if filename == self.STATE_FILE:
1494 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1495 continue
1496 if filename in previous:
1497 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1498 previous.remove(filename)
1499 continue
1500
1501 # An untracked file. Delete it.
1502 logging.warning('Removing unknown file %s from cache', filename)
1503 p = self._path(filename)
1504 if fs.isdir(p):
1505 try:
1506 file_path.rmtree(p)
1507 except OSError:
1508 pass
1509 else:
1510 file_path.try_remove(p)
1511 continue
1512
1513 if previous:
1514 # Filter out entries that were not found.
1515 logging.warning('Removed %d lost files', len(previous))
1516 for filename in previous:
1517 self._lru.pop(filename)
maruel36a963d2016-04-08 17:15:49 -07001518
1519 # What remains to be done is to hash every single item to
1520 # detect corruption, then save to ensure state.json is up to date.
1521 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1522 # TODO(maruel): Let's revisit once directory metadata is stored in
1523 # state.json so only the files that had been mapped since the last cleanup()
1524 # call are manually verified.
1525 #
1526 #with self._lock:
1527 # for digest in self._lru:
1528 # if not isolated_format.is_valid_hash(
1529 # self._path(digest), self.hash_algo):
1530 # self.evict(digest)
1531 # logging.info('Deleted corrupted item: %s', digest)
1532
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001533 def touch(self, digest, size):
1534 """Verifies an actual file is valid.
1535
1536 Note that is doesn't compute the hash so it could still be corrupted if the
1537 file size didn't change.
1538
1539 TODO(maruel): More stringent verification while keeping the check fast.
1540 """
1541 # Do the check outside the lock.
1542 if not is_valid_file(self._path(digest), size):
1543 return False
1544
1545 # Update it's LRU position.
1546 with self._lock:
1547 if digest not in self._lru:
1548 return False
1549 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001550 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001551 return True
1552
1553 def evict(self, digest):
1554 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001555 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001556 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001557 self._lru.pop(digest)
1558 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1559
1560 def read(self, digest):
nodir445097b2016-06-03 22:50:26 -07001561 try:
1562 with fs.open(self._path(digest), 'rb') as f:
1563 return f.read()
1564 except IOError:
1565 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001566
1567 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001568 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001569 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001570 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001571 path = self._path(digest)
1572 # A stale broken file may remain. It is possible for the file to have write
1573 # access bit removed which would cause the file_write() call to fail to open
1574 # in write mode. Take no chance here.
1575 file_path.try_remove(path)
1576 try:
1577 size = file_write(path, content)
1578 except:
1579 # There are two possible places were an exception can occur:
1580 # 1) Inside |content| generator in case of network or unzipping errors.
1581 # 2) Inside file_write itself in case of disk IO errors.
1582 # In any case delete an incomplete file and propagate the exception to
1583 # caller, it will be logged there.
1584 file_path.try_remove(path)
1585 raise
1586 # Make the file read-only in the cache. This has a few side-effects since
1587 # the file node is modified, so every directory entries to this file becomes
1588 # read-only. It's fine here because it is a new file.
1589 file_path.set_read_only(path, True)
1590 with self._lock:
1591 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001592 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001593
maruel4409e302016-07-19 14:25:51 -07001594 def link(self, digest, dest, file_mode, use_symlink):
1595 """Links the file to |dest|.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001596
1597 Note that the file permission bits are on the file node, not the directory
1598 entry, so changing the access bit on any of the directory entries for the
1599 file node will affect them all.
1600 """
1601 path = self._path(digest)
maruel4409e302016-07-19 14:25:51 -07001602 mode = (
1603 file_path.SYMLINK_WITH_FALLBACK if use_symlink
1604 else file_path.HARDLINK_WITH_FALLBACK)
1605 if not file_path.link_file(dest, path, mode):
maruel1f7e8162015-09-16 10:35:43 -07001606 # Report to the server that it failed with more details. We'll want to
1607 # squash them all.
maruel4409e302016-07-19 14:25:51 -07001608 on_error.report('Failed to link\n%s -> %s' % (path, dest))
maruel1f7e8162015-09-16 10:35:43 -07001609
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001610 if file_mode is not None:
1611 # Ignores all other bits.
maruel12e30012015-10-09 11:55:35 -07001612 fs.chmod(dest, file_mode & 0500)
maruel064c0a32016-04-05 11:47:15 -07001613 with self._lock:
1614 self._linked.append(self._lru[digest])
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001615
1616 def _load(self):
maruel2e8d0f52016-07-16 07:51:29 -07001617 """Loads state of the cache from json file.
1618
1619 If cache_dir does not exist on disk, it is created.
1620 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001621 self._lock.assert_locked()
1622
maruel2e8d0f52016-07-16 07:51:29 -07001623 if not fs.isfile(self.state_file):
1624 if not os.path.isdir(self.cache_dir):
1625 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001626 else:
maruel2e8d0f52016-07-16 07:51:29 -07001627 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001628 try:
1629 self._lru = lru.LRUDict.load(self.state_file)
1630 except ValueError as err:
1631 logging.error('Failed to load cache state: %s' % (err,))
1632 # Don't want to keep broken state file.
1633 file_path.try_remove(self.state_file)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001634 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001635 # We want the initial cache size after trimming, i.e. what is readily
1636 # avaiable.
1637 self._initial_number_items = len(self._lru)
1638 self._initial_size = sum(self._lru.itervalues())
1639 if self._evicted:
1640 logging.info(
1641 'Trimming evicted items with the following sizes: %s',
1642 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001643
1644 def _save(self):
1645 """Saves the LRU ordering."""
1646 self._lock.assert_locked()
1647 if sys.platform != 'win32':
1648 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001649 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001650 # Necessary otherwise the file can't be created.
1651 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001652 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001653 file_path.set_read_only(self.state_file, False)
1654 self._lru.save(self.state_file)
1655
1656 def _trim(self):
1657 """Trims anything we don't know, make sure enough free space exists."""
1658 self._lock.assert_locked()
1659
1660 # Ensure maximum cache size.
1661 if self.policies.max_cache_size:
1662 total_size = sum(self._lru.itervalues())
1663 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001664 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001665
1666 # Ensure maximum number of items in the cache.
1667 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1668 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001669 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001670
1671 # Ensure enough free space.
1672 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001673 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001674 while (
1675 self.policies.min_free_space and
1676 self._lru and
1677 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001678 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001679 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001680
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001681 if trimmed_due_to_space:
1682 total_usage = sum(self._lru.itervalues())
1683 usage_percent = 0.
1684 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001685 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1686
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001687 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001688 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1689 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1690 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001691 self._free_disk / 1024.,
1692 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001693 usage_percent,
1694 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001695 self._save()
1696
1697 def _path(self, digest):
1698 """Returns the path to one item."""
1699 return os.path.join(self.cache_dir, digest)
1700
maruel2e8d0f52016-07-16 07:51:29 -07001701 def _remove_lru_file(self, allow_protected):
1702 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001703 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001704 try:
1705 digest, size = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001706 if not allow_protected and digest == self._protected:
1707 raise Error('Not enough space to map the whole isolated tree')
maruel083fa552016-04-08 14:38:01 -07001708 except KeyError:
1709 raise Error('Nothing to remove')
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001710 digest, size = self._lru.pop_oldest()
kjlubickea9abf02016-06-01 09:34:33 -07001711 logging.debug("Removing LRU file %s", digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001712 self._delete_file(digest, size)
1713 return size
1714
1715 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1716 """Adds an item into LRU cache marking it as a newest one."""
1717 self._lock.assert_locked()
1718 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001719 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001720 self._added.append(size)
1721 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001722 self._free_disk -= size
1723 # Do a quicker version of self._trim(). It only enforces free disk space,
1724 # not cache size limits. It doesn't actually look at real free disk space,
1725 # only uses its cache values. self._trim() will be called later to enforce
1726 # real trimming but doing this quick version here makes it possible to map
1727 # an isolated that is larger than the current amount of free disk space when
1728 # the cache size is already large.
1729 while (
1730 self.policies.min_free_space and
1731 self._lru and
1732 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001733 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001734
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001735 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1736 """Deletes cache file from the file system."""
1737 self._lock.assert_locked()
1738 try:
1739 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001740 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001741 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001742 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001743 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001744 except OSError as e:
1745 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
1746
1747
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001748class IsolatedBundle(object):
1749 """Fetched and parsed .isolated file with all dependencies."""
1750
Vadim Shtayura3148e072014-09-02 18:51:52 -07001751 def __init__(self):
1752 self.command = []
1753 self.files = {}
1754 self.read_only = None
1755 self.relative_cwd = None
1756 # The main .isolated file, a IsolatedFile instance.
1757 self.root = None
1758
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001759 def fetch(self, fetch_queue, root_isolated_hash, algo):
1760 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001761
1762 It enables support for "included" .isolated files. They are processed in
1763 strict order but fetched asynchronously from the cache. This is important so
1764 that a file in an included .isolated file that is overridden by an embedding
1765 .isolated file is not fetched needlessly. The includes are fetched in one
1766 pass and the files are fetched as soon as all the ones on the left-side
1767 of the tree were fetched.
1768
1769 The prioritization is very important here for nested .isolated files.
1770 'includes' have the highest priority and the algorithm is optimized for both
1771 deep and wide trees. A deep one is a long link of .isolated files referenced
1772 one at a time by one item in 'includes'. A wide one has a large number of
1773 'includes' in a single .isolated file. 'left' is defined as an included
1774 .isolated file earlier in the 'includes' list. So the order of the elements
1775 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001776
1777 As a side effect this method starts asynchronous fetch of all data files
1778 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1779 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001780 """
1781 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1782
1783 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1784 pending = {}
1785 # Set of hashes of already retrieved items to refuse recursive includes.
1786 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001787 # Set of IsolatedFile's whose data files have already being fetched.
1788 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001789
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001790 def retrieve_async(isolated_file):
Vadim Shtayura3148e072014-09-02 18:51:52 -07001791 h = isolated_file.obj_hash
1792 if h in seen:
1793 raise isolated_format.IsolatedError(
1794 'IsolatedFile %s is retrieved recursively' % h)
1795 assert h not in pending
1796 seen.add(h)
1797 pending[h] = isolated_file
1798 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1799
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001800 # Start fetching root *.isolated file (single file, not the whole bundle).
1801 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001802
1803 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001804 # Wait until some *.isolated file is fetched, parse it.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001805 item_hash = fetch_queue.wait(pending)
1806 item = pending.pop(item_hash)
1807 item.load(fetch_queue.cache.read(item_hash))
Vadim Shtayura3148e072014-09-02 18:51:52 -07001808
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001809 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001810 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001811 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001812
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001813 # Always fetch *.isolated files in traversal order, waiting if necessary
1814 # until next to-be-processed node loads. "Waiting" is done by yielding
1815 # back to the outer loop, that waits until some *.isolated is loaded.
1816 for node in isolated_format.walk_includes(self.root):
1817 if node not in processed:
1818 # Not visited, and not yet loaded -> wait for it to load.
1819 if not node.is_loaded:
1820 break
1821 # Not visited and loaded -> process it and continue the traversal.
1822 self._start_fetching_files(node, fetch_queue)
1823 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001824
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001825 # All *.isolated files should be processed by now and only them.
1826 all_isolateds = set(isolated_format.walk_includes(self.root))
1827 assert all_isolateds == processed, (all_isolateds, processed)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001828
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001829 # Extract 'command' and other bundle properties.
1830 for node in isolated_format.walk_includes(self.root):
1831 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001832 self.relative_cwd = self.relative_cwd or ''
1833
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001834 def _start_fetching_files(self, isolated, fetch_queue):
1835 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001836
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001837 Modifies self.files.
1838 """
1839 logging.debug('fetch_files(%s)', isolated.obj_hash)
1840 for filepath, properties in isolated.data.get('files', {}).iteritems():
1841 # Root isolated has priority on the files being mapped. In particular,
1842 # overridden files must not be fetched.
1843 if filepath not in self.files:
1844 self.files[filepath] = properties
1845 if 'h' in properties:
1846 # Preemptively request files.
1847 logging.debug('fetching %s', filepath)
1848 fetch_queue.add(
1849 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1850
1851 def _update_self(self, node):
1852 """Extracts bundle global parameters from loaded *.isolated file.
1853
1854 Will be called with each loaded *.isolated file in order of traversal of
1855 isolated include graph (see isolated_format.walk_includes).
1856 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001857 # Grabs properties.
1858 if not self.command and node.data.get('command'):
1859 # Ensure paths are correctly separated on windows.
1860 self.command = node.data['command']
1861 if self.command:
1862 self.command[0] = self.command[0].replace('/', os.path.sep)
1863 self.command = tools.fix_python_path(self.command)
1864 if self.read_only is None and node.data.get('read_only') is not None:
1865 self.read_only = node.data['read_only']
1866 if (self.relative_cwd is None and
1867 node.data.get('relative_cwd') is not None):
1868 self.relative_cwd = node.data['relative_cwd']
1869
1870
Vadim Shtayura8623c272014-12-01 11:45:27 -08001871def set_storage_api_class(cls):
1872 """Replaces StorageApi implementation used by default."""
1873 global _storage_api_cls
1874 assert _storage_api_cls is None
1875 assert issubclass(cls, StorageApi)
1876 _storage_api_cls = cls
1877
1878
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001879def get_storage_api(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001880 """Returns an object that implements low-level StorageApi interface.
1881
1882 It is used by Storage to work with single isolate |namespace|. It should
1883 rarely be used directly by clients, see 'get_storage' for
1884 a better alternative.
1885
1886 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001887 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001888 namespace: isolate namespace to operate in, also defines hashing and
1889 compression scheme used, i.e. namespace names that end with '-gzip'
1890 store compressed data.
1891
1892 Returns:
1893 Instance of StorageApi subclass.
1894 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001895 cls = _storage_api_cls or IsolateServer
1896 return cls(url, namespace)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001897
1898
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001899def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001900 """Returns Storage class that can upload and download from |namespace|.
1901
1902 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001903 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001904 namespace: isolate namespace to operate in, also defines hashing and
1905 compression scheme used, i.e. namespace names that end with '-gzip'
1906 store compressed data.
1907
1908 Returns:
1909 Instance of Storage.
1910 """
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001911 return Storage(get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001912
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001913
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001914def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001915 """Uploads the given tree to the given url.
1916
1917 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001918 base_url: The url of the isolate server to upload to.
1919 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001920 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001921 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001922 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001923 # Filter out symlinks, since they are not represented by items on isolate
1924 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001925 items = []
1926 seen = set()
1927 skipped = 0
1928 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001929 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001930 if 'l' not in metadata and filepath not in seen:
1931 seen.add(filepath)
1932 item = FileItem(
1933 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001934 digest=metadata['h'],
1935 size=metadata['s'],
1936 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001937 items.append(item)
1938 else:
1939 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001940
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001941 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001942 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001943 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001944
1945
maruel4409e302016-07-19 14:25:51 -07001946def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001947 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001948
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001949 Arguments:
1950 isolated_hash: hash of the root *.isolated file.
1951 storage: Storage class that communicates with isolate storage.
1952 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001953 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001954 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001955
1956 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001957 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001958 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001959 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001960 'fetch_isolated(%s, %s, %s, %s, %s)',
1961 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001962 # Hash algorithm to use, defined by namespace |storage| is using.
1963 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001964 with cache:
1965 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001966 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001967
1968 with tools.Profiler('GetIsolateds'):
1969 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001970 if not isolated_format.is_valid_hash(isolated_hash, algo):
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001971 logging.debug('%s is not a valid hash, assuming a file', isolated_hash)
maruel1ceb3872015-10-14 06:10:44 -07001972 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001973 try:
maruel1ceb3872015-10-14 06:10:44 -07001974 isolated_hash = fetch_queue.inject_local_file(path, algo)
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001975 except IOError:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001976 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001977 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1978 'valid hash?' % isolated_hash)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001979
1980 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001981 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001982
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001983 with tools.Profiler('GetRest'):
1984 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001985 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001986 create_directories(outdir, bundle.files)
1987 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001988
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001989 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001990 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001991 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001992
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001993 # Multimap: digest -> list of pairs (path, props).
1994 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001995 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001996 if 'h' in props:
1997 remaining.setdefault(props['h'], []).append((filepath, props))
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001998
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001999 # Now block on the remaining files to be downloaded and mapped.
2000 logging.info('Retrieving remaining files (%d of them)...',
2001 fetch_queue.pending_count)
2002 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07002003 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002004 while remaining:
2005 detector.ping()
2006
2007 # Wait for any item to finish fetching to cache.
2008 digest = fetch_queue.wait(remaining)
2009
2010 # Link corresponding files to a fetched item in cache.
2011 for filepath, props in remaining.pop(digest):
nodir90bc8dc2016-06-15 13:35:21 -07002012 dest = os.path.join(outdir, filepath)
2013 if os.path.exists(dest):
2014 raise AlreadyExists('File %s already exists' % dest)
maruel4409e302016-07-19 14:25:51 -07002015 cache.link(digest, dest, props.get('m'), use_symlinks)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002016
2017 # Report progress.
2018 duration = time.time() - last_update
2019 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
2020 msg = '%d files remaining...' % len(remaining)
2021 print msg
2022 logging.info(msg)
2023 last_update = time.time()
2024
2025 # Cache could evict some items we just tried to fetch, it's a fatal error.
2026 if not fetch_queue.verify_all_cached():
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002027 raise isolated_format.MappingError(
2028 'Cache is too small to hold all requested files')
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07002029 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002030
2031
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002032def directory_to_metadata(root, algo, blacklist):
2033 """Returns the FileItem list and .isolated metadata for a directory."""
2034 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002035 paths = isolated_format.expand_directory_and_symlink(
Vadim Shtayura439d3fc2014-05-07 16:05:12 -07002036 root, '.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002037 metadata = {
2038 relpath: isolated_format.file_to_metadata(
Marc-Antoine Ruelf1d827c2014-11-24 15:22:25 -05002039 os.path.join(root, relpath), {}, 0, algo)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04002040 for relpath in paths
2041 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002042 for v in metadata.itervalues():
2043 v.pop('t')
2044 items = [
2045 FileItem(
2046 path=os.path.join(root, relpath),
2047 digest=meta['h'],
2048 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002049 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002050 for relpath, meta in metadata.iteritems() if 'h' in meta
2051 ]
2052 return items, metadata
2053
2054
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002055def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002056 """Stores every entries and returns the relevant data.
2057
2058 Arguments:
2059 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002060 files: list of file paths to upload. If a directory is specified, a
2061 .isolated file is created and its hash is returned.
2062 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07002063
2064 Returns:
2065 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
2066 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05002067 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002068 assert all(isinstance(i, unicode) for i in files), files
2069 if len(files) != len(set(map(os.path.abspath, files))):
2070 raise Error('Duplicate entries found.')
2071
maruel064c0a32016-04-05 11:47:15 -07002072 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002073 results = []
2074 # The temporary directory is only created as needed.
2075 tempdir = None
2076 try:
2077 # TODO(maruel): Yield the files to a worker thread.
2078 items_to_upload = []
2079 for f in files:
2080 try:
2081 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07002082 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002083 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07002084 items, metadata = directory_to_metadata(
2085 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002086
2087 # Create the .isolated file.
2088 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002089 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
2090 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002091 os.close(handle)
2092 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002093 'algo':
2094 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002095 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002096 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002097 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04002098 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002099 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002100 items_to_upload.extend(items)
2101 items_to_upload.append(
2102 FileItem(
2103 path=isolated,
2104 digest=h,
maruel12e30012015-10-09 11:55:35 -07002105 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002106 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002107 results.append((h, f))
2108
maruel12e30012015-10-09 11:55:35 -07002109 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04002110 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002111 items_to_upload.append(
2112 FileItem(
2113 path=filepath,
2114 digest=h,
maruel12e30012015-10-09 11:55:35 -07002115 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08002116 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002117 results.append((h, f))
2118 else:
2119 raise Error('%s is neither a file or directory.' % f)
2120 except OSError:
2121 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07002122 uploaded = storage.upload_items(items_to_upload)
2123 cold = [i for i in items_to_upload if i in uploaded]
2124 hot = [i for i in items_to_upload if i not in uploaded]
2125 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002126 finally:
maruel12e30012015-10-09 11:55:35 -07002127 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04002128 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002129
2130
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002131def archive(out, namespace, files, blacklist):
2132 if files == ['-']:
2133 files = sys.stdin.readlines()
2134
2135 if not files:
2136 raise Error('Nothing to upload')
2137
2138 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002139 blacklist = tools.gen_blacklist(blacklist)
2140 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07002141 # Ignore stats.
2142 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002143 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
2144
2145
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002146@subcommand.usage('<file1..fileN> or - to read from stdin')
2147def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002148 """Archives data to the server.
2149
2150 If a directory is specified, a .isolated file is created the whole directory
2151 is uploaded. Then this .isolated file can be included in another one to run
2152 commands.
2153
2154 The commands output each file that was processed with its content hash. For
2155 directories, the .isolated generated for the directory is listed as the
2156 directory entry itself.
2157 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002158 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002159 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002160 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07002161 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002162 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05002163 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002164 except Error as e:
2165 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05002166 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002167
2168
2169def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002170 """Download data from the server.
2171
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002172 It can either download individual files or a complete tree from a .isolated
2173 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002174 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002175 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002176 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05002177 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002178 help='hash of an isolated file, .isolated file content is discarded, use '
2179 '--file if you need it')
2180 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002181 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
2182 help='hash and destination of a file, can be used multiple times')
2183 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002184 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002185 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07002186 parser.add_option(
2187 '--use-symlinks', action='store_true',
2188 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002189 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002190 options, args = parser.parse_args(args)
2191 if args:
2192 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002193
nodir55be77b2016-05-03 09:39:57 -07002194 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00002195 if bool(options.isolated) == bool(options.file):
2196 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07002197 if not options.cache and options.use_symlinks:
2198 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002199
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002200 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07002201 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07002202 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002203 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002204 if (fs.isfile(options.target) or
2205 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002206 parser.error(
2207 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002208 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002209 # Fetching individual files.
2210 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002211 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002212 channel = threading_utils.TaskChannel()
2213 pending = {}
2214 for digest, dest in options.file:
2215 pending[digest] = dest
2216 storage.async_fetch(
2217 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002218 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002219 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002220 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002221 functools.partial(file_write, os.path.join(options.target, dest)))
2222 while pending:
2223 fetched = channel.pull()
2224 dest = pending.pop(fetched)
2225 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002226
Vadim Shtayura3172be52013-12-03 12:49:05 -08002227 # Fetching whole isolated tree.
2228 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002229 with cache:
2230 bundle = fetch_isolated(
2231 isolated_hash=options.isolated,
2232 storage=storage,
2233 cache=cache,
maruel4409e302016-07-19 14:25:51 -07002234 outdir=options.target,
2235 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002236 if bundle.command:
2237 rel = os.path.join(options.target, bundle.relative_cwd)
2238 print('To run this test please run from the directory %s:' %
2239 os.path.join(options.target, rel))
2240 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002241
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002242 return 0
2243
2244
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002245def add_archive_options(parser):
2246 parser.add_option(
2247 '--blacklist',
2248 action='append', default=list(DEFAULT_BLACKLIST),
2249 help='List of regexp to use as blacklist filter when uploading '
2250 'directories')
2251
2252
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002253def add_isolate_server_options(parser):
2254 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002255 parser.add_option(
2256 '-I', '--isolate-server',
2257 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002258 help='URL of the Isolate Server to use. Defaults to the environment '
2259 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2260 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002261 parser.add_option(
2262 '--namespace', default='default-gzip',
2263 help='The namespace to use on the Isolate Server, default: %default')
2264
2265
nodir55be77b2016-05-03 09:39:57 -07002266def process_isolate_server_options(
2267 parser, options, set_exception_handler, required):
2268 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002269
2270 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002271 """
2272 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002273 if required:
2274 parser.error('--isolate-server is required.')
2275 return
2276
Marc-Antoine Ruel012067b2014-12-10 15:45:42 -05002277 try:
2278 options.isolate_server = net.fix_url(options.isolate_server)
2279 except ValueError as e:
2280 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002281 if set_exception_handler:
2282 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002283 try:
2284 return auth.ensure_logged_in(options.isolate_server)
2285 except ValueError as e:
2286 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002287
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002288
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002289def add_cache_options(parser):
2290 cache_group = optparse.OptionGroup(parser, 'Cache management')
2291 cache_group.add_option(
2292 '--cache', metavar='DIR',
2293 help='Directory to keep a local cache of the files. Accelerates download '
2294 'by reusing already downloaded files. Default=%default')
2295 cache_group.add_option(
2296 '--max-cache-size',
2297 type='int',
2298 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002299 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002300 help='Trim if the cache gets larger than this value, default=%default')
2301 cache_group.add_option(
2302 '--min-free-space',
2303 type='int',
2304 metavar='NNN',
2305 default=2*1024*1024*1024,
2306 help='Trim if disk free space becomes lower than this value, '
2307 'default=%default')
2308 cache_group.add_option(
2309 '--max-items',
2310 type='int',
2311 metavar='NNN',
2312 default=100000,
2313 help='Trim if more than this number of items are in the cache '
2314 'default=%default')
2315 parser.add_option_group(cache_group)
2316
2317
2318def process_cache_options(options):
2319 if options.cache:
2320 policies = CachePolicies(
2321 options.max_cache_size, options.min_free_space, options.max_items)
2322
2323 # |options.cache| path may not exist until DiskCache() instance is created.
2324 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002325 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002326 policies,
2327 isolated_format.get_hash_algo(options.namespace))
2328 else:
2329 return MemoryCache()
2330
2331
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002332class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002333 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002334 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002335 self,
2336 version=__version__,
2337 prog=os.path.basename(sys.modules[__name__].__file__),
2338 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002339 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002340
2341 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002342 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002343 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002344 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002345 return options, args
2346
2347
2348def main(args):
2349 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002350 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002351
2352
2353if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002354 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002355 fix_encoding.fix_encoding()
2356 tools.disable_buffering()
2357 colorama.init()
maruel4409e302016-07-19 14:25:51 -07002358 file_path.enable_symlink()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002359 sys.exit(main(sys.argv[1:]))