blob: cee7182d34a9b11dad9e8e8d31d55203c287d0fe [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb45a94c2018-05-14 14:19:53 -04008__version__ = '0.8.3'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
12import io
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -040032from utils import lru
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000033from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040034from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070035from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000036from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000037from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000038
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080039import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040040import isolated_format
aludwin81178302016-11-30 17:18:49 -080041import isolate_storage
42from isolate_storage import Item
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080043
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000044
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000045# Version of isolate protocol passed to the server in /handshake request.
46ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048
Vadim Shtayura3148e072014-09-02 18:51:52 -070049# The file size to be used when we don't know the correct file size,
50# generally used for .isolated files.
51UNKNOWN_FILE_SIZE = None
52
53
54# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_write(path, content_generator):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000123 """Writes file content as generated by content_generator.
124
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 Creates the intermediary directory as needed.
126
127 Returns the number of bytes written.
128
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000129 Meant to be mocked out in unit tests.
130 """
nodire5028a92016-04-29 14:38:21 -0700131 file_path.ensure_tree(os.path.dirname(path))
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000132 total = 0
maruel12e30012015-10-09 11:55:35 -0700133 with fs.open(path, 'wb') as f:
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000134 for d in content_generator:
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000135 total += len(d)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000136 f.write(d)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000137 return total
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +0000138
139
tansell9e04a8d2016-07-28 09:31:59 -0700140def fileobj_path(fileobj):
141 """Return file system path for file like object or None.
142
143 The returned path is guaranteed to exist and can be passed to file system
144 operations like copy.
145 """
146 name = getattr(fileobj, 'name', None)
147 if name is None:
148 return
149
150 # If the file like object was created using something like open("test.txt")
151 # name will end up being a str (such as a function outside our control, like
152 # the standard library). We want all our paths to be unicode objects, so we
153 # decode it.
154 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500155 # We incorrectly assume that UTF-8 is used everywhere.
156 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700157
tansell26de79e2016-11-13 18:41:11 -0800158 # fs.exists requires an absolute path, otherwise it will fail with an
159 # assertion error.
160 if not os.path.isabs(name):
161 return
162
tansell9e04a8d2016-07-28 09:31:59 -0700163 if fs.exists(name):
164 return name
165
166
167# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
168# wrappers have been created.
169def fileobj_copy(
170 dstfileobj, srcfileobj, size=-1,
171 chunk_size=isolated_format.DISK_FILE_CHUNK):
172 """Copy data from srcfileobj to dstfileobj.
173
174 Providing size means exactly that amount of data will be copied (if there
175 isn't enough data, an IOError exception is thrown). Otherwise all data until
176 the EOF marker will be copied.
177 """
178 if size == -1 and hasattr(srcfileobj, 'tell'):
179 if srcfileobj.tell() != 0:
180 raise IOError('partial file but not using size')
181
182 written = 0
183 while written != size:
184 readsize = chunk_size
185 if size > 0:
186 readsize = min(readsize, size-written)
187 data = srcfileobj.read(readsize)
188 if not data:
189 if size == -1:
190 break
191 raise IOError('partial file, got %s, wanted %s' % (written, size))
192 dstfileobj.write(data)
193 written += len(data)
194
195
196def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
197 """Put srcfileobj at the given dstpath with given mode.
198
199 The function aims to do this as efficiently as possible while still allowing
200 any possible file like object be given.
201
202 Creating a tree of hardlinks has a few drawbacks:
203 - tmpfs cannot be used for the scratch space. The tree has to be on the same
204 partition as the cache.
205 - involves a write to the inode, which advances ctime, cause a metadata
206 writeback (causing disk seeking).
207 - cache ctime cannot be used to detect modifications / corruption.
208 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
209 partition. This is why the function automatically fallbacks to copying the
210 file content.
211 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
212 same owner is for all hardlinks.
213 - Anecdotal report that ext2 is known to be potentially faulty on high rate
214 of hardlink creation.
215
216 Creating a tree of symlinks has a few drawbacks:
217 - Tasks running the equivalent of os.path.realpath() will get the naked path
218 and may fail.
219 - Windows:
220 - Symlinks are reparse points:
221 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
222 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
223 - Symbolic links are Win32 paths, not NT paths.
224 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
225 - Symbolic links are supported on Windows 7 and later only.
226 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
227 default.
228 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
229 RID is present in the token;
230 https://msdn.microsoft.com/en-us/library/bb530410.aspx
231 """
232 srcpath = fileobj_path(srcfileobj)
233 if srcpath and size == -1:
234 readonly = file_mode is None or (
235 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
236
237 if readonly:
238 # If the file is read only we can link the file
239 if use_symlink:
240 link_mode = file_path.SYMLINK_WITH_FALLBACK
241 else:
242 link_mode = file_path.HARDLINK_WITH_FALLBACK
243 else:
244 # If not read only, we must copy the file
245 link_mode = file_path.COPY
246
247 file_path.link_file(dstpath, srcpath, link_mode)
248 else:
249 # Need to write out the file
250 with fs.open(dstpath, 'wb') as dstfileobj:
251 fileobj_copy(dstfileobj, srcfileobj, size)
252
253 assert fs.exists(dstpath)
254
255 # file_mode of 0 is actually valid, so need explicit check.
256 if file_mode is not None:
257 fs.chmod(dstpath, file_mode)
258
259
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000260def zip_compress(content_generator, level=7):
261 """Reads chunks from |content_generator| and yields zip compressed chunks."""
262 compressor = zlib.compressobj(level)
263 for chunk in content_generator:
264 compressed = compressor.compress(chunk)
265 if compressed:
266 yield compressed
267 tail = compressor.flush(zlib.Z_FINISH)
268 if tail:
269 yield tail
270
271
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400272def zip_decompress(
273 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000274 """Reads zipped data from |content_generator| and yields decompressed data.
275
276 Decompresses data in small chunks (no larger than |chunk_size|) so that
277 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
278
279 Raises IOError if data is corrupted or incomplete.
280 """
281 decompressor = zlib.decompressobj()
282 compressed_size = 0
283 try:
284 for chunk in content_generator:
285 compressed_size += len(chunk)
286 data = decompressor.decompress(chunk, chunk_size)
287 if data:
288 yield data
289 while decompressor.unconsumed_tail:
290 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
291 if data:
292 yield data
293 tail = decompressor.flush()
294 if tail:
295 yield tail
296 except zlib.error as e:
297 raise IOError(
298 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
299 # Ensure all data was read and decompressed.
300 if decompressor.unused_data or decompressor.unconsumed_tail:
301 raise IOError('Not all data was decompressed')
302
303
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304def get_zip_compression_level(filename):
305 """Given a filename calculates the ideal zip compression level to use."""
306 file_ext = os.path.splitext(filename)[1].lower()
307 # TODO(csharp): Profile to find what compression level works best.
308 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
309
310
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311def create_directories(base_directory, files):
312 """Creates the directory structure needed by the given list of files."""
313 logging.debug('create_directories(%s, %d)', base_directory, len(files))
314 # Creates the tree of directories to create.
315 directories = set(os.path.dirname(f) for f in files)
316 for item in list(directories):
317 while item:
318 directories.add(item)
319 item = os.path.dirname(item)
320 for d in sorted(directories):
321 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700322 abs_d = os.path.join(base_directory, d)
323 if not fs.isdir(abs_d):
324 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000325
326
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327def create_symlinks(base_directory, files):
328 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329 for filepath, properties in files:
330 if 'l' not in properties:
331 continue
332 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500333 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000334 logging.warning('Ignoring symlink %s', filepath)
335 continue
336 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700337 try:
338 os.symlink(properties['l'], outfile) # pylint: disable=E1101
339 except OSError as e:
340 if e.errno == errno.EEXIST:
341 raise AlreadyExists('File %s already exists.' % outfile)
342 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000343
344
maruel12e30012015-10-09 11:55:35 -0700345def is_valid_file(path, size):
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000346 """Determines if the given files appears valid.
347
vadimsh129e5942017-01-04 16:42:46 -0800348 Currently it just checks the file exists and its size matches the expectation.
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000349 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700350 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -0700351 return fs.isfile(path)
vadimsh129e5942017-01-04 16:42:46 -0800352 try:
353 actual_size = fs.stat(path).st_size
354 except OSError as e:
355 logging.warning(
356 'Can\'t read item %s, assuming it\'s invalid: %s',
357 os.path.basename(path), e)
358 return False
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000359 if size != actual_size:
360 logging.warning(
361 'Found invalid item %s; %d != %d',
maruel12e30012015-10-09 11:55:35 -0700362 os.path.basename(path), actual_size, size)
maruel@chromium.orgdedbf492013-09-12 20:42:11 +0000363 return False
364 return True
365
366
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000367class FileItem(Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800368 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000369
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 Its digest and size may be provided in advance, if known. Otherwise they will
371 be derived from the file content.
372 """
373
374 def __init__(self, path, digest=None, size=None, high_priority=False):
375 super(FileItem, self).__init__(
376 digest,
maruel12e30012015-10-09 11:55:35 -0700377 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800378 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000379 self.path = path
380 self.compression_level = get_zip_compression_level(path)
381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000386class BufferItem(Item):
387 """A byte buffer to push to Storage."""
388
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800389 def __init__(self, buf, high_priority=False):
390 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000391 self.buffer = buf
392
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800393 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000394 return [self.buffer]
395
396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800398 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000399
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800400 Implements compression support, parallel 'contains' checks, parallel uploads
401 and more.
402
403 Works only within single namespace (and thus hashing algorithm and compression
404 scheme are fixed).
405
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400406 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
407 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800408 """
409
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700410 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000411 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400412 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700413 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400414 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 self._cpu_thread_pool = None
416 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400417 self._aborted = False
418 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700421 def hash_algo(self):
422 """Hashing algorithm used to name files in storage based on their content.
423
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400424 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700425 """
426 return self._hash_algo
427
428 @property
429 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500430 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700431 return self._storage_api.location
432
433 @property
434 def namespace(self):
435 """Isolate namespace used by this storage.
436
437 Indirectly defines hashing scheme and compression method used.
438 """
439 return self._storage_api.namespace
440
441 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000442 def cpu_thread_pool(self):
443 """ThreadPool for CPU-bound tasks like zipping."""
444 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500445 threads = max(threading_utils.num_processors(), 2)
446 if sys.maxsize <= 2L**32:
447 # On 32 bits userland, do not try to use more than 16 threads.
448 threads = min(threads, 16)
449 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000450 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000451
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000452 @property
453 def net_thread_pool(self):
454 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
455 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700456 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000457 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000458
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000459 def close(self):
460 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400461 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000462 if self._cpu_thread_pool:
463 self._cpu_thread_pool.join()
464 self._cpu_thread_pool.close()
465 self._cpu_thread_pool = None
466 if self._net_thread_pool:
467 self._net_thread_pool.join()
468 self._net_thread_pool.close()
469 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400470 logging.info('Done.')
471
472 def abort(self):
473 """Cancels any pending or future operations."""
474 # This is not strictly theadsafe, but in the worst case the logging message
475 # will be printed twice. Not a big deal. In other places it is assumed that
476 # unprotected reads and writes to _aborted are serializable (it is true
477 # for python) and thus no locking is used.
478 if not self._aborted:
479 logging.warning('Aborting... It can take a while.')
480 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000481
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000482 def __enter__(self):
483 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400484 assert not self._prev_sig_handlers, self._prev_sig_handlers
485 for s in (signal.SIGINT, signal.SIGTERM):
486 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000487 return self
488
489 def __exit__(self, _exc_type, _exc_value, _traceback):
490 """Context manager interface."""
491 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400492 while self._prev_sig_handlers:
493 s, h = self._prev_sig_handlers.popitem()
494 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000495 return False
496
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000497 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800498 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800500 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501
502 Arguments:
503 items: list of Item instances that represents data to upload.
504
505 Returns:
506 List of items that were uploaded. All other items are already there.
507 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700508 logging.info('upload_items(items=%d)', len(items))
509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 # Ensure all digests are calculated.
511 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700512 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000514 # For each digest keep only first Item that matches it. All other items
515 # are just indistinguishable copies from the point of view of isolate
516 # server (it doesn't care about paths at all, only content and digests).
517 seen = {}
518 duplicates = 0
519 for item in items:
520 if seen.setdefault(item.digest, item) is not item:
521 duplicates += 1
522 items = seen.values()
523 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700524 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000525
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000527 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 channel = threading_utils.TaskChannel()
530 for missing_item, push_state in self.get_missing_items(items):
531 missing.add(missing_item)
532 self.async_push(channel, missing_item, push_state)
533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 # No need to spawn deadlock detector thread if there's nothing to upload.
535 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700536 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000538 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000539 detector.ping()
540 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000541 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000543 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 logging.info('All files are uploaded')
545
546 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000547 total = len(items)
548 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 logging.info(
550 'Total: %6d, %9.1fkb',
551 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000552 total_size / 1024.)
553 cache_hit = set(items) - missing
554 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 logging.info(
556 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
557 len(cache_hit),
558 cache_hit_size / 1024.,
559 len(cache_hit) * 100. / total,
560 cache_hit_size * 100. / total_size if total_size else 0)
561 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000562 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 logging.info(
564 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
565 len(cache_miss),
566 cache_miss_size / 1024.,
567 len(cache_miss) * 100. / total,
568 cache_miss_size * 100. / total_size if total_size else 0)
569
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000570 return uploaded
571
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800572 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000573 """Starts asynchronous push to the server in a parallel thread.
574
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800575 Can be used only after |item| was checked for presence on a server with
576 'get_missing_items' call. 'get_missing_items' returns |push_state| object
577 that contains storage specific information describing how to upload
578 the item (for example in case of cloud storage, it is signed upload URLs).
579
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000581 channel: TaskChannel that receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000582 item: item to upload as instance of Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 push_state: push state returned by 'get_missing_items' call for |item|.
584
585 Returns:
586 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000587 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800588 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400589 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700590 threading_utils.PRIORITY_HIGH if item.high_priority
591 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000593 def push(content):
Marc-Antoine Ruel095a8be2014-03-21 14:58:19 -0400594 """Pushes an Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400595 if self._aborted:
596 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700597 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800598 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000599 return item
600
Wei Huang1a38fbe2017-11-28 22:55:22 -0500601 # If zipping is not required, just start a push task. Don't pass 'content'
602 # so that it can create a new generator when it retries on failures.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700603 if not self._use_zip:
Wei Huang1a38fbe2017-11-28 22:55:22 -0500604 self.net_thread_pool.add_task_with_channel(channel, priority, push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return
606
607 # If zipping is enabled, zip in a separate thread.
608 def zip_and_push():
609 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
610 # content right here. It will block until all file is zipped.
611 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400612 if self._aborted:
613 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800614 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000615 data = ''.join(stream)
616 except Exception as exc:
617 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800618 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500620 # Pass '[data]' explicitly because the compressed data is not same as the
621 # one provided by 'item'. Since '[data]' is a list, it can safely be
622 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000623 self.net_thread_pool.add_task_with_channel(
624 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000625 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000626
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 def push(self, item, push_state):
628 """Synchronously pushes a single item to the server.
629
630 If you need to push many items at once, consider using 'upload_items' or
631 'async_push' with instance of TaskChannel.
632
633 Arguments:
634 item: item to upload as instance of Item class.
635 push_state: push state returned by 'get_missing_items' call for |item|.
636
637 Returns:
638 Pushed item (same object as |item|).
639 """
640 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700641 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 self.async_push(channel, item, push_state)
643 pushed = channel.pull()
644 assert pushed is item
645 return item
646
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000647 def async_fetch(self, channel, priority, digest, size, sink):
648 """Starts asynchronous fetch from the server in a parallel thread.
649
650 Arguments:
651 channel: TaskChannel that receives back |digest| when download ends.
652 priority: thread pool task priority for the fetch.
653 digest: hex digest of an item to download.
654 size: expected size of the item (after decompression).
655 sink: function that will be called as sink(generator).
656 """
657 def fetch():
658 try:
659 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400660 stream = self._storage_api.fetch(digest, size, 0)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700661 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400662 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000663 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400664 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000665 # Verified stream goes to |sink|.
666 sink(verifier.run())
667 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800668 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000669 raise
670 return digest
671
672 # Don't bother with zip_thread_pool for decompression. Decompression is
673 # really fast and most probably IO bound anyway.
674 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
675
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000676 def get_missing_items(self, items):
677 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000678
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000679 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000680
681 Arguments:
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000682 items: a list of Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000683
684 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800685 For each missing item it yields a pair (item, push_state), where:
686 * item - Item object that is missing (one of |items|).
687 * push_state - opaque object that contains storage specific information
688 describing how to upload the item (for example in case of cloud
689 storage, it is signed upload URLs). It can later be passed to
690 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000691 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000692 channel = threading_utils.TaskChannel()
693 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800694
695 # Ensure all digests are calculated.
696 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700697 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800698
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400699 def contains(batch):
700 if self._aborted:
701 raise Aborted()
702 return self._storage_api.contains(batch)
703
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000704 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800705 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400706 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400707 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000708 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800709
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000710 # Yield results as they come in.
711 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800712 for missing_item, push_state in channel.pull().iteritems():
713 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000714
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000715
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800716def batch_items_for_check(items):
717 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000718
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800719 Each batch corresponds to a single 'exists?' query to the server via a call
720 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000721
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800722 Arguments:
723 items: a list of Item objects.
724
725 Yields:
726 Batches of items to query for existence in a single operation,
727 each batch is a list of Item objects.
728 """
729 batch_count = 0
730 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
731 next_queries = []
732 for item in sorted(items, key=lambda x: x.size, reverse=True):
733 next_queries.append(item)
734 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000735 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800736 next_queries = []
737 batch_count += 1
738 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
739 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
740 if next_queries:
741 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000742
743
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000744class FetchQueue(object):
745 """Fetches items from Storage and places them into LocalCache.
746
747 It manages multiple concurrent fetch operations. Acts as a bridge between
748 Storage and LocalCache so that Storage and LocalCache don't depend on each
749 other at all.
750 """
751
752 def __init__(self, storage, cache):
753 self.storage = storage
754 self.cache = cache
755 self._channel = threading_utils.TaskChannel()
756 self._pending = set()
757 self._accessed = set()
758 self._fetched = cache.cached_set()
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400759 # Pending digests that the caller waits for, see wait_on()/wait().
760 self._waiting_on = set()
761 # Already fetched digests the caller waits for which are not yet returned by
762 # wait().
763 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000764
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400765 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700766 self,
767 digest,
768 size=UNKNOWN_FILE_SIZE,
769 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000770 """Starts asynchronous fetch of item |digest|."""
771 # Fetching it now?
772 if digest in self._pending:
773 return
774
775 # Mark this file as in use, verify_all_cached will later ensure it is still
776 # in cache.
777 self._accessed.add(digest)
778
779 # Already fetched? Notify cache to update item's LRU position.
780 if digest in self._fetched:
781 # 'touch' returns True if item is in cache and not corrupted.
782 if self.cache.touch(digest, size):
783 return
784 # Item is corrupted, remove it from cache and fetch it again.
785 self._fetched.remove(digest)
786 self.cache.evict(digest)
787
788 # TODO(maruel): It should look at the free disk space, the current cache
789 # size and the size of the new item on every new item:
790 # - Trim the cache as more entries are listed when free disk space is low,
791 # otherwise if the amount of data downloaded during the run > free disk
792 # space, it'll crash.
793 # - Make sure there's enough free disk space to fit all dependencies of
794 # this run! If not, abort early.
795
796 # Start fetching.
797 self._pending.add(digest)
798 self.storage.async_fetch(
799 self._channel, priority, digest, size,
800 functools.partial(self.cache.write, digest))
801
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400802 def wait_on(self, digest):
803 """Updates digests to be waited on by 'wait'."""
804 # Calculate once the already fetched items. These will be retrieved first.
805 if digest in self._fetched:
806 self._waiting_on_ready.add(digest)
807 else:
808 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000809
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400810 def wait(self):
811 """Waits until any of waited-on items is retrieved.
812
813 Once this happens, it is remove from the waited-on set and returned.
814
815 This function is called in two waves. The first wave it is done for HIGH
816 priority items, the isolated files themselves. The second wave it is called
817 for all the files.
818
819 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000820 """
821 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400822 if self._waiting_on_ready:
823 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000824
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400825 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000826
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400827 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000828 while self._pending:
829 digest = self._channel.pull()
830 self._pending.remove(digest)
831 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400832 if digest in self._waiting_on:
833 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000834 return digest
835
836 # Should never reach this point due to assert above.
837 raise RuntimeError('Impossible state')
838
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400839 @property
840 def wait_queue_empty(self):
841 """Returns True if there is no digest left for wait() to return."""
842 return not self._waiting_on and not self._waiting_on_ready
843
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000844 def inject_local_file(self, path, algo):
845 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700846 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000847 data = f.read()
848 digest = algo(data).hexdigest()
849 self.cache.write(digest, [data])
850 self._fetched.add(digest)
851 return digest
852
853 @property
854 def pending_count(self):
855 """Returns number of items to be fetched."""
856 return len(self._pending)
857
858 def verify_all_cached(self):
859 """True if all accessed items are in cache."""
860 return self._accessed.issubset(self.cache.cached_set())
861
862
863class FetchStreamVerifier(object):
864 """Verifies that fetched file is valid before passing it to the LocalCache."""
865
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400866 def __init__(self, stream, hasher, expected_digest, expected_size):
867 """Initializes the verifier.
868
869 Arguments:
870 * stream: an iterable yielding chunks of content
871 * hasher: an object from hashlib that supports update() and hexdigest()
872 (eg, hashlib.sha1).
873 * expected_digest: if the entire stream is piped through hasher and then
874 summarized via hexdigest(), this should be the result. That is, it
875 should be a hex string like 'abc123'.
876 * expected_size: either the expected size of the stream, or
877 UNKNOWN_FILE_SIZE.
878 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400879 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000880 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400881 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 self.expected_size = expected_size
883 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400884 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000885
886 def run(self):
887 """Generator that yields same items as |stream|.
888
889 Verifies |stream| is complete before yielding a last chunk to consumer.
890
891 Also wraps IOError produced by consumer into MappingError exceptions since
892 otherwise Storage will retry fetch on unrelated local cache errors.
893 """
894 # Read one chunk ahead, keep it in |stored|.
895 # That way a complete stream can be verified before pushing last chunk
896 # to consumer.
897 stored = None
898 for chunk in self.stream:
899 assert chunk is not None
900 if stored is not None:
901 self._inspect_chunk(stored, is_last=False)
902 try:
903 yield stored
904 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400905 raise isolated_format.MappingError(
906 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000907 stored = chunk
908 if stored is not None:
909 self._inspect_chunk(stored, is_last=True)
910 try:
911 yield stored
912 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400913 raise isolated_format.MappingError(
914 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000915
916 def _inspect_chunk(self, chunk, is_last):
917 """Called for each fetched chunk before passing it to consumer."""
918 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400919 self.rolling_hash.update(chunk)
920 if not is_last:
921 return
922
923 if ((self.expected_size != UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000924 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400925 msg = 'Incorrect file size: want %d, got %d' % (
926 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400927 raise IOError(msg)
928
929 actual_digest = self.rolling_hash.hexdigest()
930 if self.expected_digest != actual_digest:
931 msg = 'Incorrect digest: want %s, got %s' % (
932 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400933 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000934
935
nodir445097b2016-06-03 22:50:26 -0700936class CacheMiss(Exception):
937 """Raised when an item is not in cache."""
938
939 def __init__(self, digest):
940 self.digest = digest
941 super(CacheMiss, self).__init__(
942 'Item with digest %r is not found in cache' % digest)
943
944
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000945class LocalCache(object):
946 """Local cache that stores objects fetched via Storage.
947
948 It can be accessed concurrently from multiple threads, so it should protect
949 its internal state with some lock.
950 """
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -0500951 cache_dir = None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000952
maruel064c0a32016-04-05 11:47:15 -0700953 def __init__(self):
954 self._lock = threading_utils.LockWithAssert()
955 # Profiling values.
956 self._added = []
957 self._initial_number_items = 0
958 self._initial_size = 0
959 self._evicted = []
tansell9e04a8d2016-07-28 09:31:59 -0700960 self._used = []
maruel064c0a32016-04-05 11:47:15 -0700961
nodirbe642ff2016-06-09 15:51:51 -0700962 def __contains__(self, digest):
963 raise NotImplementedError()
964
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000965 def __enter__(self):
966 """Context manager interface."""
967 return self
968
969 def __exit__(self, _exc_type, _exec_value, _traceback):
970 """Context manager interface."""
971 return False
972
maruel064c0a32016-04-05 11:47:15 -0700973 @property
974 def added(self):
975 return self._added[:]
976
977 @property
978 def evicted(self):
979 return self._evicted[:]
980
981 @property
tansell9e04a8d2016-07-28 09:31:59 -0700982 def used(self):
983 return self._used[:]
984
985 @property
maruel064c0a32016-04-05 11:47:15 -0700986 def initial_number_items(self):
987 return self._initial_number_items
988
989 @property
990 def initial_size(self):
991 return self._initial_size
992
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -0500993 @property
994 def number_items(self):
995 """Returns the total size of the cache in bytes."""
996 raise NotImplementedError()
997
998 @property
999 def total_size(self):
1000 """Returns the total size of the cache in bytes."""
1001 raise NotImplementedError()
1002
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001003 def cached_set(self):
1004 """Returns a set of all cached digests (always a new object)."""
1005 raise NotImplementedError()
1006
maruel36a963d2016-04-08 17:15:49 -07001007 def cleanup(self):
1008 """Deletes any corrupted item from the cache and trims it if necessary."""
1009 raise NotImplementedError()
1010
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001011 def touch(self, digest, size):
1012 """Ensures item is not corrupted and updates its LRU position.
1013
1014 Arguments:
1015 digest: hash digest of item to check.
1016 size: expected size of this item.
1017
1018 Returns:
1019 True if item is in cache and not corrupted.
1020 """
1021 raise NotImplementedError()
1022
1023 def evict(self, digest):
1024 """Removes item from cache if it's there."""
1025 raise NotImplementedError()
1026
tansell9e04a8d2016-07-28 09:31:59 -07001027 def getfileobj(self, digest):
1028 """Returns a readable file like object.
1029
1030 If file exists on the file system it will have a .name attribute with an
1031 absolute path to the file.
1032 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001033 raise NotImplementedError()
1034
1035 def write(self, digest, content):
maruel083fa552016-04-08 14:38:01 -07001036 """Reads data from |content| generator and stores it in cache.
1037
1038 Returns digest to simplify chaining.
1039 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001040 raise NotImplementedError()
1041
maruele6fc9382017-05-04 09:03:48 -07001042 def trim(self):
1043 """Enforces cache policies.
1044
1045 Returns:
1046 Number of items evicted.
1047 """
1048 raise NotImplementedError()
1049
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001050
1051class MemoryCache(LocalCache):
1052 """LocalCache implementation that stores everything in memory."""
1053
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001054 def __init__(self, file_mode_mask=0500):
1055 """Args:
1056 file_mode_mask: bit mask to AND file mode with. Default value will make
1057 all mapped files to be read only.
1058 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001059 super(MemoryCache, self).__init__()
Vadim Shtayurae3fbd102014-04-29 17:05:21 -07001060 self._file_mode_mask = file_mode_mask
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001061 self._contents = {}
1062
nodirbe642ff2016-06-09 15:51:51 -07001063 def __contains__(self, digest):
1064 with self._lock:
1065 return digest in self._contents
1066
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001067 @property
1068 def number_items(self):
1069 with self._lock:
1070 return len(self._contents)
1071
1072 @property
1073 def total_size(self):
1074 with self._lock:
1075 return sum(len(i) for i in self._contents.itervalues())
1076
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001077 def cached_set(self):
1078 with self._lock:
1079 return set(self._contents)
1080
maruel36a963d2016-04-08 17:15:49 -07001081 def cleanup(self):
1082 pass
1083
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001084 def touch(self, digest, size):
1085 with self._lock:
1086 return digest in self._contents
1087
1088 def evict(self, digest):
1089 with self._lock:
maruel064c0a32016-04-05 11:47:15 -07001090 v = self._contents.pop(digest, None)
1091 if v is not None:
1092 self._evicted.add(v)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001093
tansell9e04a8d2016-07-28 09:31:59 -07001094 def getfileobj(self, digest):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001095 with self._lock:
nodir445097b2016-06-03 22:50:26 -07001096 try:
tansell9e04a8d2016-07-28 09:31:59 -07001097 d = self._contents[digest]
nodir445097b2016-06-03 22:50:26 -07001098 except KeyError:
1099 raise CacheMiss(digest)
tansell9e04a8d2016-07-28 09:31:59 -07001100 self._used.append(len(d))
1101 return io.BytesIO(d)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001102
1103 def write(self, digest, content):
1104 # Assemble whole stream before taking the lock.
1105 data = ''.join(content)
1106 with self._lock:
1107 self._contents[digest] = data
maruel064c0a32016-04-05 11:47:15 -07001108 self._added.append(len(data))
maruel083fa552016-04-08 14:38:01 -07001109 return digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001110
maruele6fc9382017-05-04 09:03:48 -07001111 def trim(self):
1112 """Trimming is not implemented for MemoryCache."""
1113 return 0
1114
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001115
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001116class CachePolicies(object):
1117 def __init__(self, max_cache_size, min_free_space, max_items):
1118 """
1119 Arguments:
1120 - max_cache_size: Trim if the cache gets larger than this value. If 0, the
1121 cache is effectively a leak.
1122 - min_free_space: Trim if disk free space becomes lower than this value. If
1123 0, it unconditionally fill the disk.
1124 - max_items: Maximum number of items to keep in the cache. If 0, do not
1125 enforce a limit.
Marc-Antoine Ruel6fd96962018-04-18 15:55:52 -04001126 - max_age_secs: Maximum age an item is kept in the cache until it is
1127 automatically evicted. Having a lot of dead luggage slows
1128 everything down.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001129 """
1130 self.max_cache_size = max_cache_size
1131 self.min_free_space = min_free_space
1132 self.max_items = max_items
Marc-Antoine Ruel6fd96962018-04-18 15:55:52 -04001133 # 3 weeks. Make it configurable later if there is use case but for now it's
1134 # fairly safe value. Think about lowering this value, likely to 1 week.
1135 # In practice, a fair chunk of bots are already recycled on a daily schedule
1136 # so this code doesn't have any effect to them, unless they are preloaded
1137 # with a really old cache.
1138 self.max_age_secs = 21*24*60*60
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001139
Marc-Antoine Ruel211a9552018-01-17 16:47:29 -05001140 def __str__(self):
1141 return (
1142 'CachePolicies(cache=%dbytes, %d items; min_free_space=%d)') % (
1143 self.max_cache_size, self.max_items, self.min_free_space)
1144
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001145
1146class DiskCache(LocalCache):
1147 """Stateful LRU cache in a flat hash table in a directory.
1148
1149 Saves its state as json file.
1150 """
maruel12e30012015-10-09 11:55:35 -07001151 STATE_FILE = u'state.json'
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001152
maruele6fc9382017-05-04 09:03:48 -07001153 def __init__(self, cache_dir, policies, hash_algo, trim, time_fn=None):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001154 """
1155 Arguments:
1156 cache_dir: directory where to place the cache.
1157 policies: cache retention policies.
1158 algo: hashing algorithm used.
nodirf33b8d62016-10-26 22:34:58 -07001159 trim: if True to enforce |policies| right away.
1160 It can be done later by calling trim() explicitly.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001161 """
maruel064c0a32016-04-05 11:47:15 -07001162 # All protected methods (starting with '_') except _path should be called
1163 # with self._lock held.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001164 super(DiskCache, self).__init__()
1165 self.cache_dir = cache_dir
1166 self.policies = policies
1167 self.hash_algo = hash_algo
1168 self.state_file = os.path.join(cache_dir, self.STATE_FILE)
maruel083fa552016-04-08 14:38:01 -07001169 # Items in a LRU lookup dict(digest: size).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001170 self._lru = lru.LRUDict()
maruel083fa552016-04-08 14:38:01 -07001171 # Current cached free disk space. It is updated by self._trim().
nodirf33b8d62016-10-26 22:34:58 -07001172 file_path.ensure_tree(self.cache_dir)
1173 self._free_disk = file_path.get_free_space(self.cache_dir)
maruel2e8d0f52016-07-16 07:51:29 -07001174 # The first item in the LRU cache that must not be evicted during this run
1175 # since it was referenced. All items more recent that _protected in the LRU
1176 # cache are also inherently protected. It could be a set() of all items
1177 # referenced but this increases memory usage without a use case.
1178 self._protected = None
maruel36a963d2016-04-08 17:15:49 -07001179 # Cleanup operations done by self._load(), if any.
1180 self._operations = []
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001181 with tools.Profiler('Setup'):
1182 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001183 self._load(trim, time_fn)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001184
nodirbe642ff2016-06-09 15:51:51 -07001185 def __contains__(self, digest):
1186 with self._lock:
1187 return digest in self._lru
1188
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001189 def __enter__(self):
1190 return self
1191
1192 def __exit__(self, _exc_type, _exec_value, _traceback):
1193 with tools.Profiler('CleanupTrimming'):
1194 with self._lock:
1195 self._trim()
1196
1197 logging.info(
1198 '%5d (%8dkb) added',
1199 len(self._added), sum(self._added) / 1024)
1200 logging.info(
1201 '%5d (%8dkb) current',
1202 len(self._lru),
1203 sum(self._lru.itervalues()) / 1024)
1204 logging.info(
maruel064c0a32016-04-05 11:47:15 -07001205 '%5d (%8dkb) evicted',
1206 len(self._evicted), sum(self._evicted) / 1024)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001207 logging.info(
1208 ' %8dkb free',
1209 self._free_disk / 1024)
1210 return False
1211
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001212 @property
1213 def number_items(self):
1214 with self._lock:
1215 return len(self._lru)
1216
1217 @property
1218 def total_size(self):
1219 with self._lock:
1220 return sum(self._lru.itervalues())
1221
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001222 def cached_set(self):
1223 with self._lock:
1224 return self._lru.keys_set()
1225
maruel36a963d2016-04-08 17:15:49 -07001226 def cleanup(self):
maruel2e8d0f52016-07-16 07:51:29 -07001227 """Cleans up the cache directory.
1228
1229 Ensures there is no unknown files in cache_dir.
1230 Ensures the read-only bits are set correctly.
1231
1232 At that point, the cache was already loaded, trimmed to respect cache
1233 policies.
1234 """
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001235 with self._lock:
1236 fs.chmod(self.cache_dir, 0700)
1237 # Ensure that all files listed in the state still exist and add new ones.
1238 previous = self._lru.keys_set()
1239 # It'd be faster if there were a readdir() function.
1240 for filename in fs.listdir(self.cache_dir):
1241 if filename == self.STATE_FILE:
1242 fs.chmod(os.path.join(self.cache_dir, filename), 0600)
1243 continue
1244 if filename in previous:
1245 fs.chmod(os.path.join(self.cache_dir, filename), 0400)
1246 previous.remove(filename)
1247 continue
1248
1249 # An untracked file. Delete it.
1250 logging.warning('Removing unknown file %s from cache', filename)
1251 p = self._path(filename)
1252 if fs.isdir(p):
1253 try:
1254 file_path.rmtree(p)
1255 except OSError:
1256 pass
1257 else:
1258 file_path.try_remove(p)
maruel2e8d0f52016-07-16 07:51:29 -07001259 continue
1260
Marc-Antoine Ruel51b83232017-11-13 14:58:31 -05001261 if previous:
1262 # Filter out entries that were not found.
1263 logging.warning('Removed %d lost files', len(previous))
1264 for filename in previous:
1265 self._lru.pop(filename)
1266 self._save()
maruel36a963d2016-04-08 17:15:49 -07001267
1268 # What remains to be done is to hash every single item to
1269 # detect corruption, then save to ensure state.json is up to date.
1270 # Sadly, on a 50Gb cache with 100mib/s I/O, this is still over 8 minutes.
1271 # TODO(maruel): Let's revisit once directory metadata is stored in
1272 # state.json so only the files that had been mapped since the last cleanup()
1273 # call are manually verified.
1274 #
1275 #with self._lock:
1276 # for digest in self._lru:
1277 # if not isolated_format.is_valid_hash(
1278 # self._path(digest), self.hash_algo):
1279 # self.evict(digest)
1280 # logging.info('Deleted corrupted item: %s', digest)
1281
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001282 def touch(self, digest, size):
vadimsh129e5942017-01-04 16:42:46 -08001283 """Verifies an actual file is valid and bumps its LRU position.
1284
1285 Returns False if the file is missing or invalid. Doesn't kick it from LRU
1286 though (call 'evict' explicitly).
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001287
1288 Note that is doesn't compute the hash so it could still be corrupted if the
1289 file size didn't change.
1290
1291 TODO(maruel): More stringent verification while keeping the check fast.
1292 """
1293 # Do the check outside the lock.
1294 if not is_valid_file(self._path(digest), size):
1295 return False
1296
1297 # Update it's LRU position.
1298 with self._lock:
1299 if digest not in self._lru:
1300 return False
1301 self._lru.touch(digest)
maruel2e8d0f52016-07-16 07:51:29 -07001302 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001303 return True
1304
1305 def evict(self, digest):
1306 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001307 # Do not check for 'digest == self._protected' since it could be because
maruel083fa552016-04-08 14:38:01 -07001308 # the object is corrupted.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001309 self._lru.pop(digest)
1310 self._delete_file(digest, UNKNOWN_FILE_SIZE)
1311
tansell9e04a8d2016-07-28 09:31:59 -07001312 def getfileobj(self, digest):
nodir445097b2016-06-03 22:50:26 -07001313 try:
tansell9e04a8d2016-07-28 09:31:59 -07001314 f = fs.open(self._path(digest), 'rb')
1315 with self._lock:
1316 self._used.append(self._lru[digest])
1317 return f
nodir445097b2016-06-03 22:50:26 -07001318 except IOError:
1319 raise CacheMiss(digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001320
1321 def write(self, digest, content):
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001322 assert content is not None
maruel083fa552016-04-08 14:38:01 -07001323 with self._lock:
maruel2e8d0f52016-07-16 07:51:29 -07001324 self._protected = self._protected or digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001325 path = self._path(digest)
1326 # A stale broken file may remain. It is possible for the file to have write
1327 # access bit removed which would cause the file_write() call to fail to open
1328 # in write mode. Take no chance here.
1329 file_path.try_remove(path)
1330 try:
1331 size = file_write(path, content)
1332 except:
1333 # There are two possible places were an exception can occur:
1334 # 1) Inside |content| generator in case of network or unzipping errors.
1335 # 2) Inside file_write itself in case of disk IO errors.
1336 # In any case delete an incomplete file and propagate the exception to
1337 # caller, it will be logged there.
1338 file_path.try_remove(path)
1339 raise
1340 # Make the file read-only in the cache. This has a few side-effects since
1341 # the file node is modified, so every directory entries to this file becomes
1342 # read-only. It's fine here because it is a new file.
1343 file_path.set_read_only(path, True)
1344 with self._lock:
1345 self._add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001346 return digest
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001347
nodirf33b8d62016-10-26 22:34:58 -07001348 def get_oldest(self):
1349 """Returns digest of the LRU item or None."""
1350 try:
1351 return self._lru.get_oldest()[0]
1352 except KeyError:
1353 return None
1354
1355 def get_timestamp(self, digest):
1356 """Returns timestamp of last use of an item.
1357
1358 Raises KeyError if item is not found.
1359 """
1360 return self._lru.get_timestamp(digest)
1361
1362 def trim(self):
1363 """Forces retention policies."""
1364 with self._lock:
maruele6fc9382017-05-04 09:03:48 -07001365 return self._trim()
nodirf33b8d62016-10-26 22:34:58 -07001366
maruele6fc9382017-05-04 09:03:48 -07001367 def _load(self, trim, time_fn):
maruel2e8d0f52016-07-16 07:51:29 -07001368 """Loads state of the cache from json file.
1369
1370 If cache_dir does not exist on disk, it is created.
1371 """
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001372 self._lock.assert_locked()
1373
maruel2e8d0f52016-07-16 07:51:29 -07001374 if not fs.isfile(self.state_file):
1375 if not os.path.isdir(self.cache_dir):
1376 fs.makedirs(self.cache_dir)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001377 else:
maruel2e8d0f52016-07-16 07:51:29 -07001378 # Load state of the cache.
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001379 try:
1380 self._lru = lru.LRUDict.load(self.state_file)
1381 except ValueError as err:
1382 logging.error('Failed to load cache state: %s' % (err,))
1383 # Don't want to keep broken state file.
1384 file_path.try_remove(self.state_file)
maruele6fc9382017-05-04 09:03:48 -07001385 if time_fn:
1386 self._lru.time_fn = time_fn
nodirf33b8d62016-10-26 22:34:58 -07001387 if trim:
1388 self._trim()
maruel2e8d0f52016-07-16 07:51:29 -07001389 # We want the initial cache size after trimming, i.e. what is readily
1390 # avaiable.
1391 self._initial_number_items = len(self._lru)
1392 self._initial_size = sum(self._lru.itervalues())
1393 if self._evicted:
1394 logging.info(
1395 'Trimming evicted items with the following sizes: %s',
1396 sorted(self._evicted))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001397
1398 def _save(self):
1399 """Saves the LRU ordering."""
1400 self._lock.assert_locked()
1401 if sys.platform != 'win32':
1402 d = os.path.dirname(self.state_file)
maruel12e30012015-10-09 11:55:35 -07001403 if fs.isdir(d):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001404 # Necessary otherwise the file can't be created.
1405 file_path.set_read_only(d, False)
maruel12e30012015-10-09 11:55:35 -07001406 if fs.isfile(self.state_file):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001407 file_path.set_read_only(self.state_file, False)
1408 self._lru.save(self.state_file)
1409
1410 def _trim(self):
1411 """Trims anything we don't know, make sure enough free space exists."""
1412 self._lock.assert_locked()
1413
Marc-Antoine Ruel6fd96962018-04-18 15:55:52 -04001414 # Trim old items.
1415 cutoff = self._lru.time_fn() - self.policies.max_age_secs
1416 while self._lru:
1417 oldest = self._lru.get_oldest()
1418 if oldest[1][1] >= cutoff:
1419 break
1420 self._remove_lru_file(True)
1421
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001422 # Ensure maximum cache size.
1423 if self.policies.max_cache_size:
1424 total_size = sum(self._lru.itervalues())
1425 while total_size > self.policies.max_cache_size:
maruel2e8d0f52016-07-16 07:51:29 -07001426 total_size -= self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001427
1428 # Ensure maximum number of items in the cache.
1429 if self.policies.max_items and len(self._lru) > self.policies.max_items:
1430 for _ in xrange(len(self._lru) - self.policies.max_items):
maruel2e8d0f52016-07-16 07:51:29 -07001431 self._remove_lru_file(True)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001432
1433 # Ensure enough free space.
1434 self._free_disk = file_path.get_free_space(self.cache_dir)
kjlubickea9abf02016-06-01 09:34:33 -07001435 trimmed_due_to_space = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001436 while (
1437 self.policies.min_free_space and
1438 self._lru and
1439 self._free_disk < self.policies.min_free_space):
kjlubickea9abf02016-06-01 09:34:33 -07001440 trimmed_due_to_space += 1
maruel2e8d0f52016-07-16 07:51:29 -07001441 self._remove_lru_file(True)
kjlubickea9abf02016-06-01 09:34:33 -07001442
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001443 if trimmed_due_to_space:
1444 total_usage = sum(self._lru.itervalues())
1445 usage_percent = 0.
1446 if total_usage:
kjlubickea9abf02016-06-01 09:34:33 -07001447 usage_percent = 100. * float(total_usage) / self.policies.max_cache_size
1448
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001449 logging.warning(
kjlubickea9abf02016-06-01 09:34:33 -07001450 'Trimmed %s file(s) due to not enough free disk space: %.1fkb free,'
1451 ' %.1fkb cache (%.1f%% of its maximum capacity of %.1fkb)',
1452 trimmed_due_to_space,
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001453 self._free_disk / 1024.,
1454 total_usage / 1024.,
kjlubickea9abf02016-06-01 09:34:33 -07001455 usage_percent,
1456 self.policies.max_cache_size / 1024.)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001457 self._save()
maruele6fc9382017-05-04 09:03:48 -07001458 return trimmed_due_to_space
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001459
1460 def _path(self, digest):
1461 """Returns the path to one item."""
1462 return os.path.join(self.cache_dir, digest)
1463
maruel2e8d0f52016-07-16 07:51:29 -07001464 def _remove_lru_file(self, allow_protected):
1465 """Removes the lastest recently used file and returns its size."""
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001466 self._lock.assert_locked()
maruel083fa552016-04-08 14:38:01 -07001467 try:
nodireabc11c2016-10-18 16:37:28 -07001468 digest, (size, _) = self._lru.get_oldest()
maruel2e8d0f52016-07-16 07:51:29 -07001469 if not allow_protected and digest == self._protected:
Marc-Antoine Ruel211a9552018-01-17 16:47:29 -05001470 total_size = sum(self._lru.itervalues())+size
1471 msg = (
1472 'Not enough space to fetch the whole isolated tree.\n'
1473 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
1474 self.policies, total_size, len(self._lru)+1, self._free_disk)
1475 raise Error(msg)
maruel083fa552016-04-08 14:38:01 -07001476 except KeyError:
1477 raise Error('Nothing to remove')
nodireabc11c2016-10-18 16:37:28 -07001478 digest, (size, _) = self._lru.pop_oldest()
vadimsh129e5942017-01-04 16:42:46 -08001479 logging.debug('Removing LRU file %s', digest)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001480 self._delete_file(digest, size)
1481 return size
1482
1483 def _add(self, digest, size=UNKNOWN_FILE_SIZE):
1484 """Adds an item into LRU cache marking it as a newest one."""
1485 self._lock.assert_locked()
1486 if size == UNKNOWN_FILE_SIZE:
maruel12e30012015-10-09 11:55:35 -07001487 size = fs.stat(self._path(digest)).st_size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001488 self._added.append(size)
1489 self._lru.add(digest, size)
maruel083fa552016-04-08 14:38:01 -07001490 self._free_disk -= size
1491 # Do a quicker version of self._trim(). It only enforces free disk space,
1492 # not cache size limits. It doesn't actually look at real free disk space,
1493 # only uses its cache values. self._trim() will be called later to enforce
1494 # real trimming but doing this quick version here makes it possible to map
1495 # an isolated that is larger than the current amount of free disk space when
1496 # the cache size is already large.
1497 while (
1498 self.policies.min_free_space and
1499 self._lru and
1500 self._free_disk < self.policies.min_free_space):
maruel2e8d0f52016-07-16 07:51:29 -07001501 self._remove_lru_file(False)
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001502
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001503 def _delete_file(self, digest, size=UNKNOWN_FILE_SIZE):
1504 """Deletes cache file from the file system."""
1505 self._lock.assert_locked()
1506 try:
1507 if size == UNKNOWN_FILE_SIZE:
vadimsh129e5942017-01-04 16:42:46 -08001508 try:
1509 size = fs.stat(self._path(digest)).st_size
1510 except OSError:
1511 size = 0
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001512 file_path.try_remove(self._path(digest))
maruel064c0a32016-04-05 11:47:15 -07001513 self._evicted.append(size)
maruel083fa552016-04-08 14:38:01 -07001514 self._free_disk += size
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001515 except OSError as e:
vadimsh129e5942017-01-04 16:42:46 -08001516 if e.errno != errno.ENOENT:
1517 logging.error('Error attempting to delete a file %s:\n%s' % (digest, e))
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001518
1519
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001520class IsolatedBundle(object):
1521 """Fetched and parsed .isolated file with all dependencies."""
1522
Vadim Shtayura3148e072014-09-02 18:51:52 -07001523 def __init__(self):
1524 self.command = []
1525 self.files = {}
1526 self.read_only = None
1527 self.relative_cwd = None
1528 # The main .isolated file, a IsolatedFile instance.
1529 self.root = None
1530
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001531 def fetch(self, fetch_queue, root_isolated_hash, algo):
1532 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001533
1534 It enables support for "included" .isolated files. They are processed in
1535 strict order but fetched asynchronously from the cache. This is important so
1536 that a file in an included .isolated file that is overridden by an embedding
1537 .isolated file is not fetched needlessly. The includes are fetched in one
1538 pass and the files are fetched as soon as all the ones on the left-side
1539 of the tree were fetched.
1540
1541 The prioritization is very important here for nested .isolated files.
1542 'includes' have the highest priority and the algorithm is optimized for both
1543 deep and wide trees. A deep one is a long link of .isolated files referenced
1544 one at a time by one item in 'includes'. A wide one has a large number of
1545 'includes' in a single .isolated file. 'left' is defined as an included
1546 .isolated file earlier in the 'includes' list. So the order of the elements
1547 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001548
1549 As a side effect this method starts asynchronous fetch of all data files
1550 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1551 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001552 """
1553 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1554
1555 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1556 pending = {}
1557 # Set of hashes of already retrieved items to refuse recursive includes.
1558 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001559 # Set of IsolatedFile's whose data files have already being fetched.
1560 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001561
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001562 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001563 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001564 h = isolated_file.obj_hash
1565 if h in seen:
1566 raise isolated_format.IsolatedError(
1567 'IsolatedFile %s is retrieved recursively' % h)
1568 assert h not in pending
1569 seen.add(h)
1570 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001571 # This isolated item is being added dynamically, notify FetchQueue.
1572 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001573 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1574
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001575 # Start fetching root *.isolated file (single file, not the whole bundle).
1576 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001577
1578 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001579 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001580 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001581 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001582 with fetch_queue.cache.getfileobj(item_hash) as f:
1583 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001584
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001585 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001586 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001587 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001588
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001589 # Always fetch *.isolated files in traversal order, waiting if necessary
1590 # until next to-be-processed node loads. "Waiting" is done by yielding
1591 # back to the outer loop, that waits until some *.isolated is loaded.
1592 for node in isolated_format.walk_includes(self.root):
1593 if node not in processed:
1594 # Not visited, and not yet loaded -> wait for it to load.
1595 if not node.is_loaded:
1596 break
1597 # Not visited and loaded -> process it and continue the traversal.
1598 self._start_fetching_files(node, fetch_queue)
1599 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001600
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001601 # All *.isolated files should be processed by now and only them.
1602 all_isolateds = set(isolated_format.walk_includes(self.root))
1603 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001604 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001605
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001606 # Extract 'command' and other bundle properties.
1607 for node in isolated_format.walk_includes(self.root):
1608 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001609 self.relative_cwd = self.relative_cwd or ''
1610
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001611 def _start_fetching_files(self, isolated, fetch_queue):
1612 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001613
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001614 Modifies self.files.
1615 """
maruel10bea7b2016-12-07 05:03:49 -08001616 files = isolated.data.get('files', {})
1617 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1618 for filepath, properties in files.iteritems():
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001619 # Root isolated has priority on the files being mapped. In particular,
1620 # overridden files must not be fetched.
1621 if filepath not in self.files:
1622 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001623
1624 # Make sure if the isolated is read only, the mode doesn't have write
1625 # bits.
1626 if 'm' in properties and self.read_only:
1627 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1628
1629 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001630 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001631 fetch_queue.add(
1632 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1633
1634 def _update_self(self, node):
1635 """Extracts bundle global parameters from loaded *.isolated file.
1636
1637 Will be called with each loaded *.isolated file in order of traversal of
1638 isolated include graph (see isolated_format.walk_includes).
1639 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001640 # Grabs properties.
1641 if not self.command and node.data.get('command'):
1642 # Ensure paths are correctly separated on windows.
1643 self.command = node.data['command']
1644 if self.command:
1645 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001646 if self.read_only is None and node.data.get('read_only') is not None:
1647 self.read_only = node.data['read_only']
1648 if (self.relative_cwd is None and
1649 node.data.get('relative_cwd') is not None):
1650 self.relative_cwd = node.data['relative_cwd']
1651
1652
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001653def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001654 """Returns Storage class that can upload and download from |namespace|.
1655
1656 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001657 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001658 namespace: isolate namespace to operate in, also defines hashing and
1659 compression scheme used, i.e. namespace names that end with '-gzip'
1660 store compressed data.
1661
1662 Returns:
1663 Instance of Storage.
1664 """
aludwin81178302016-11-30 17:18:49 -08001665 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001666
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001667
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001668def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001669 """Uploads the given tree to the given url.
1670
1671 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001672 base_url: The url of the isolate server to upload to.
1673 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001674 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001675 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001676 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001677 # Filter out symlinks, since they are not represented by items on isolate
1678 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001679 items = []
1680 seen = set()
1681 skipped = 0
1682 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001683 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001684 if 'l' not in metadata and filepath not in seen:
1685 seen.add(filepath)
1686 item = FileItem(
1687 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001688 digest=metadata['h'],
1689 size=metadata['s'],
1690 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001691 items.append(item)
1692 else:
1693 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001694
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001695 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001696 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001697 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001698
1699
maruel4409e302016-07-19 14:25:51 -07001700def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001701 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001702
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001703 Arguments:
1704 isolated_hash: hash of the root *.isolated file.
1705 storage: Storage class that communicates with isolate storage.
1706 cache: LocalCache class that knows how to store and map files locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001707 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001708 use_symlinks: Use symlinks instead of hardlinks when True.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001709
1710 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001711 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001712 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001713 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001714 'fetch_isolated(%s, %s, %s, %s, %s)',
1715 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001716 # Hash algorithm to use, defined by namespace |storage| is using.
1717 algo = storage.hash_algo
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001718 with cache:
1719 fetch_queue = FetchQueue(storage, cache)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001720 bundle = IsolatedBundle()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001721
1722 with tools.Profiler('GetIsolateds'):
1723 # Optionally support local files by manually adding them to cache.
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001724 if not isolated_format.is_valid_hash(isolated_hash, algo):
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001725 logging.debug('%s is not a valid hash, assuming a file '
1726 '(algo was %s, hash size was %d)',
1727 isolated_hash, algo(), algo().digest_size)
maruel1ceb3872015-10-14 06:10:44 -07001728 path = unicode(os.path.abspath(isolated_hash))
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001729 try:
maruel1ceb3872015-10-14 06:10:44 -07001730 isolated_hash = fetch_queue.inject_local_file(path, algo)
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001731 except IOError as e:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001732 raise isolated_format.MappingError(
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001733 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
Adrian Ludwinb4ebc092017-09-13 07:46:24 -04001734 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001735
1736 # Load all *.isolated and start loading rest of the files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001737 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001738
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001739 with tools.Profiler('GetRest'):
1740 # Create file system hierarchy.
nodire5028a92016-04-29 14:38:21 -07001741 file_path.ensure_tree(outdir)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001742 create_directories(outdir, bundle.files)
1743 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001744
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001745 # Ensure working directory exists.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001746 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
nodire5028a92016-04-29 14:38:21 -07001747 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001748
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001749 # Multimap: digest -> list of pairs (path, props).
1750 remaining = {}
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001751 for filepath, props in bundle.files.iteritems():
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001752 if 'h' in props:
1753 remaining.setdefault(props['h'], []).append((filepath, props))
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001754 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001755
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001756 # Now block on the remaining files to be downloaded and mapped.
1757 logging.info('Retrieving remaining files (%d of them)...',
1758 fetch_queue.pending_count)
1759 last_update = time.time()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001760 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001761 while remaining:
1762 detector.ping()
1763
1764 # Wait for any item to finish fetching to cache.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001765 digest = fetch_queue.wait()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001766
tansell9e04a8d2016-07-28 09:31:59 -07001767 # Create the files in the destination using item in cache as the
1768 # source.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001769 for filepath, props in remaining.pop(digest):
tansell9e04a8d2016-07-28 09:31:59 -07001770 fullpath = os.path.join(outdir, filepath)
1771
1772 with cache.getfileobj(digest) as srcfileobj:
tanselle4288c32016-07-28 09:45:40 -07001773 filetype = props.get('t', 'basic')
1774
1775 if filetype == 'basic':
1776 file_mode = props.get('m')
1777 if file_mode:
1778 # Ignore all bits apart from the user
1779 file_mode &= 0700
1780 putfile(
1781 srcfileobj, fullpath, file_mode,
1782 use_symlink=use_symlinks)
1783
tansell26de79e2016-11-13 18:41:11 -08001784 elif filetype == 'tar':
1785 basedir = os.path.dirname(fullpath)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001786 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1787 for ti in t:
tansell26de79e2016-11-13 18:41:11 -08001788 if not ti.isfile():
1789 logging.warning(
1790 'Path(%r) is nonfile (%s), skipped',
1791 ti.name, ti.type)
1792 continue
Marc-Antoine Ruelb45a94c2018-05-14 14:19:53 -04001793 # Handle files created on Windows fetched on POSIX and the
1794 # reverse.
1795 other_sep = '/' if os.path.sep == '\\' else '\\'
1796 name = ti.name.replace(other_sep, os.path.sep)
1797 fp = os.path.normpath(os.path.join(basedir, name))
tansell26de79e2016-11-13 18:41:11 -08001798 if not fp.startswith(basedir):
1799 logging.error(
1800 'Path(%r) is outside root directory',
1801 fp)
Marc-Antoine Ruelffd80132017-12-04 16:00:02 -05001802 ifd = t.extractfile(ti)
tansell26de79e2016-11-13 18:41:11 -08001803 file_path.ensure_tree(os.path.dirname(fp))
1804 putfile(ifd, fp, 0700, ti.size)
1805
tanselle4288c32016-07-28 09:45:40 -07001806 else:
1807 raise isolated_format.IsolatedError(
1808 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001809
1810 # Report progress.
1811 duration = time.time() - last_update
1812 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1813 msg = '%d files remaining...' % len(remaining)
Marc-Antoine Ruel7ead0542018-04-04 19:41:44 -04001814 sys.stdout.write(msg + '\n')
1815 sys.stdout.flush()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001816 logging.info(msg)
1817 last_update = time.time()
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001818 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001819
1820 # Cache could evict some items we just tried to fetch, it's a fatal error.
1821 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001822 free_disk = file_path.get_free_space(cache.cache_dir)
1823 msg = (
1824 'Cache is too small to hold all requested files.\n'
1825 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
1826 cache.policies, cache.total_size, cache.number_items, free_disk)
1827 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001828 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001829
1830
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001831def directory_to_metadata(root, algo, blacklist):
1832 """Returns the FileItem list and .isolated metadata for a directory."""
1833 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001834 paths = isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruel7a68f712017-12-01 18:45:18 -05001835 root, u'.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001836 metadata = {
1837 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001838 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001839 for relpath in paths
1840 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001841 for v in metadata.itervalues():
1842 v.pop('t')
1843 items = [
1844 FileItem(
1845 path=os.path.join(root, relpath),
1846 digest=meta['h'],
1847 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001848 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001849 for relpath, meta in metadata.iteritems() if 'h' in meta
1850 ]
1851 return items, metadata
1852
1853
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001854def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001855 """Stores every entries and returns the relevant data.
1856
1857 Arguments:
1858 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001859 files: list of file paths to upload. If a directory is specified, a
1860 .isolated file is created and its hash is returned.
1861 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001862
1863 Returns:
1864 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1865 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001866 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001867 assert all(isinstance(i, unicode) for i in files), files
1868 if len(files) != len(set(map(os.path.abspath, files))):
1869 raise Error('Duplicate entries found.')
1870
maruel064c0a32016-04-05 11:47:15 -07001871 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001872 results = []
1873 # The temporary directory is only created as needed.
1874 tempdir = None
1875 try:
1876 # TODO(maruel): Yield the files to a worker thread.
1877 items_to_upload = []
1878 for f in files:
1879 try:
1880 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001881 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001882 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001883 items, metadata = directory_to_metadata(
1884 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001885
1886 # Create the .isolated file.
1887 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001888 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1889 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001890 os.close(handle)
1891 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001892 'algo':
1893 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001894 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001895 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001896 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001897 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001898 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001899 items_to_upload.extend(items)
1900 items_to_upload.append(
1901 FileItem(
1902 path=isolated,
1903 digest=h,
maruel12e30012015-10-09 11:55:35 -07001904 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001905 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001906 results.append((h, f))
1907
maruel12e30012015-10-09 11:55:35 -07001908 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001909 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001910 items_to_upload.append(
1911 FileItem(
1912 path=filepath,
1913 digest=h,
maruel12e30012015-10-09 11:55:35 -07001914 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001915 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001916 results.append((h, f))
1917 else:
1918 raise Error('%s is neither a file or directory.' % f)
1919 except OSError:
1920 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001921 uploaded = storage.upload_items(items_to_upload)
1922 cold = [i for i in items_to_upload if i in uploaded]
1923 hot = [i for i in items_to_upload if i not in uploaded]
1924 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001925 finally:
maruel12e30012015-10-09 11:55:35 -07001926 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001927 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001928
1929
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001930def archive(out, namespace, files, blacklist):
1931 if files == ['-']:
1932 files = sys.stdin.readlines()
1933
1934 if not files:
1935 raise Error('Nothing to upload')
1936
1937 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001938 blacklist = tools.gen_blacklist(blacklist)
1939 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001940 # Ignore stats.
1941 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001942 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1943
1944
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001945@subcommand.usage('<file1..fileN> or - to read from stdin')
1946def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001947 """Archives data to the server.
1948
1949 If a directory is specified, a .isolated file is created the whole directory
1950 is uploaded. Then this .isolated file can be included in another one to run
1951 commands.
1952
1953 The commands output each file that was processed with its content hash. For
1954 directories, the .isolated generated for the directory is listed as the
1955 directory entry itself.
1956 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001957 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001958 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001959 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001960 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001961 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001962 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001963 except Error as e:
1964 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001965 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001966
1967
1968def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001969 """Download data from the server.
1970
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001971 It can either download individual files or a complete tree from a .isolated
1972 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001973 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001974 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001975 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001976 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001977 help='hash of an isolated file, .isolated file content is discarded, use '
1978 '--file if you need it')
1979 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001980 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1981 help='hash and destination of a file, can be used multiple times')
1982 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001983 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001984 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001985 parser.add_option(
1986 '--use-symlinks', action='store_true',
1987 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001988 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001989 options, args = parser.parse_args(args)
1990 if args:
1991 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001992 if not file_path.enable_symlink():
1993 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001994
nodir55be77b2016-05-03 09:39:57 -07001995 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001996 if bool(options.isolated) == bool(options.file):
1997 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001998 if not options.cache and options.use_symlinks:
1999 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00002000
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002001 cache = process_cache_options(options)
maruel2e8d0f52016-07-16 07:51:29 -07002002 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07002003 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002004 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07002005 if (fs.isfile(options.target) or
2006 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04002007 parser.error(
2008 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002009 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08002010 # Fetching individual files.
2011 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002012 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08002013 channel = threading_utils.TaskChannel()
2014 pending = {}
2015 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07002016 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08002017 pending[digest] = dest
2018 storage.async_fetch(
2019 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002020 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002021 digest,
Vadim Shtayura3148e072014-09-02 18:51:52 -07002022 UNKNOWN_FILE_SIZE,
Vadim Shtayura3172be52013-12-03 12:49:05 -08002023 functools.partial(file_write, os.path.join(options.target, dest)))
2024 while pending:
2025 fetched = channel.pull()
2026 dest = pending.pop(fetched)
2027 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002028
Vadim Shtayura3172be52013-12-03 12:49:05 -08002029 # Fetching whole isolated tree.
2030 if options.isolated:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002031 with cache:
2032 bundle = fetch_isolated(
2033 isolated_hash=options.isolated,
2034 storage=storage,
2035 cache=cache,
maruel4409e302016-07-19 14:25:51 -07002036 outdir=options.target,
2037 use_symlinks=options.use_symlinks)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002038 if bundle.command:
2039 rel = os.path.join(options.target, bundle.relative_cwd)
2040 print('To run this test please run from the directory %s:' %
2041 os.path.join(options.target, rel))
2042 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00002043
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002044 return 0
2045
2046
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05002047def add_archive_options(parser):
2048 parser.add_option(
2049 '--blacklist',
2050 action='append', default=list(DEFAULT_BLACKLIST),
2051 help='List of regexp to use as blacklist filter when uploading '
2052 'directories')
2053
2054
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002055def add_isolate_server_options(parser):
2056 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002057 parser.add_option(
2058 '-I', '--isolate-server',
2059 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002060 help='URL of the Isolate Server to use. Defaults to the environment '
2061 'variable ISOLATE_SERVER if set. No need to specify https://, this '
2062 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002063 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07002064 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08002065 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002066 '--namespace', default='default-gzip',
2067 help='The namespace to use on the Isolate Server, default: %default')
2068
2069
nodir55be77b2016-05-03 09:39:57 -07002070def process_isolate_server_options(
2071 parser, options, set_exception_handler, required):
2072 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002073
2074 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002075 """
2076 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07002077 if required:
2078 parser.error('--isolate-server is required.')
2079 return
2080
aludwind7b7b7e2017-06-29 16:38:50 -07002081 if options.grpc_proxy:
2082 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08002083 else:
2084 try:
2085 options.isolate_server = net.fix_url(options.isolate_server)
2086 except ValueError as e:
2087 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05002088 if set_exception_handler:
2089 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05002090 try:
2091 return auth.ensure_logged_in(options.isolate_server)
2092 except ValueError as e:
2093 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05002094
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05002095
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002096def add_cache_options(parser):
2097 cache_group = optparse.OptionGroup(parser, 'Cache management')
2098 cache_group.add_option(
2099 '--cache', metavar='DIR',
2100 help='Directory to keep a local cache of the files. Accelerates download '
2101 'by reusing already downloaded files. Default=%default')
2102 cache_group.add_option(
2103 '--max-cache-size',
2104 type='int',
2105 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08002106 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002107 help='Trim if the cache gets larger than this value, default=%default')
2108 cache_group.add_option(
2109 '--min-free-space',
2110 type='int',
2111 metavar='NNN',
2112 default=2*1024*1024*1024,
2113 help='Trim if disk free space becomes lower than this value, '
2114 'default=%default')
2115 cache_group.add_option(
2116 '--max-items',
2117 type='int',
2118 metavar='NNN',
2119 default=100000,
2120 help='Trim if more than this number of items are in the cache '
2121 'default=%default')
2122 parser.add_option_group(cache_group)
2123
2124
maruele6fc9382017-05-04 09:03:48 -07002125def process_cache_options(options, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002126 if options.cache:
2127 policies = CachePolicies(
2128 options.max_cache_size, options.min_free_space, options.max_items)
2129
2130 # |options.cache| path may not exist until DiskCache() instance is created.
2131 return DiskCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04002132 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002133 policies,
nodirf33b8d62016-10-26 22:34:58 -07002134 isolated_format.get_hash_algo(options.namespace),
maruele6fc9382017-05-04 09:03:48 -07002135 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04002136 else:
2137 return MemoryCache()
2138
2139
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002140class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002141 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002142 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05002143 self,
2144 version=__version__,
2145 prog=os.path.basename(sys.modules[__name__].__file__),
2146 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08002147 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002148
2149 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04002150 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002151 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08002152 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002153 return options, args
2154
2155
2156def main(args):
2157 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04002158 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00002159
2160
2161if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07002162 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00002163 fix_encoding.fix_encoding()
2164 tools.disable_buffering()
2165 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00002166 sys.exit(main(sys.argv[1:]))