blob: 7953ade3ecbd9524402b412922a04bdd5e6070fd [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000025from utils import tools
26tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000028# third_party/
29import colorama
30from depot_tools import fix_encoding
31from depot_tools import subcommand
Lei Leife202df2019-06-11 17:33:34 +000032from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000033
34# pylint: disable=ungrouped-imports
35import auth
36import isolated_format
37import isolate_storage
38import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050039from utils import file_path
maruel12e30012015-10-09 11:55:35 -070040from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040041from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000042from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040043from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070044from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000045from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080046
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# Version of isolate protocol passed to the server in /handshake request.
49ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000050
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000051
Vadim Shtayura3148e072014-09-02 18:51:52 -070052# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
maruel@chromium.org41601642013-09-18 19:40:46 +000080# The delay (in seconds) to wait between logging statements when retrieving
81# the required files. This is intended to let the user (or buildbot) know that
82# the program is still running.
83DELAY_BETWEEN_UPDATES_IN_SECS = 30
84
85
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050086DEFAULT_BLACKLIST = (
87 # Temporary vim or python files.
88 r'^.+\.(?:pyc|swp)$',
89 # .git or .svn directory.
90 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
91)
92
93
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050094class Error(Exception):
95 """Generic runtime error."""
96 pass
97
98
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040099class Aborted(Error):
100 """Operation aborted."""
101 pass
102
103
nodir90bc8dc2016-06-15 13:35:21 -0700104class AlreadyExists(Error):
105 """File already exists."""
106
107
maruel12e30012015-10-09 11:55:35 -0700108def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800109 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700110 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 if offset:
112 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000113 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000114 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 if not data:
116 break
117 yield data
118
119
tansell9e04a8d2016-07-28 09:31:59 -0700120def fileobj_path(fileobj):
121 """Return file system path for file like object or None.
122
123 The returned path is guaranteed to exist and can be passed to file system
124 operations like copy.
125 """
126 name = getattr(fileobj, 'name', None)
127 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000128 return None
tansell9e04a8d2016-07-28 09:31:59 -0700129
130 # If the file like object was created using something like open("test.txt")
131 # name will end up being a str (such as a function outside our control, like
132 # the standard library). We want all our paths to be unicode objects, so we
133 # decode it.
134 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500135 # We incorrectly assume that UTF-8 is used everywhere.
136 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700137
tansell26de79e2016-11-13 18:41:11 -0800138 # fs.exists requires an absolute path, otherwise it will fail with an
139 # assertion error.
140 if not os.path.isabs(name):
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000141 return None
tansell26de79e2016-11-13 18:41:11 -0800142
tansell9e04a8d2016-07-28 09:31:59 -0700143 if fs.exists(name):
144 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000145 return None
tansell9e04a8d2016-07-28 09:31:59 -0700146
147
148# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
149# wrappers have been created.
150def fileobj_copy(
151 dstfileobj, srcfileobj, size=-1,
152 chunk_size=isolated_format.DISK_FILE_CHUNK):
153 """Copy data from srcfileobj to dstfileobj.
154
155 Providing size means exactly that amount of data will be copied (if there
156 isn't enough data, an IOError exception is thrown). Otherwise all data until
157 the EOF marker will be copied.
158 """
159 if size == -1 and hasattr(srcfileobj, 'tell'):
160 if srcfileobj.tell() != 0:
161 raise IOError('partial file but not using size')
162
163 written = 0
164 while written != size:
165 readsize = chunk_size
166 if size > 0:
167 readsize = min(readsize, size-written)
168 data = srcfileobj.read(readsize)
169 if not data:
170 if size == -1:
171 break
172 raise IOError('partial file, got %s, wanted %s' % (written, size))
173 dstfileobj.write(data)
174 written += len(data)
175
176
177def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
178 """Put srcfileobj at the given dstpath with given mode.
179
180 The function aims to do this as efficiently as possible while still allowing
181 any possible file like object be given.
182
183 Creating a tree of hardlinks has a few drawbacks:
184 - tmpfs cannot be used for the scratch space. The tree has to be on the same
185 partition as the cache.
186 - involves a write to the inode, which advances ctime, cause a metadata
187 writeback (causing disk seeking).
188 - cache ctime cannot be used to detect modifications / corruption.
189 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
190 partition. This is why the function automatically fallbacks to copying the
191 file content.
192 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
193 same owner is for all hardlinks.
194 - Anecdotal report that ext2 is known to be potentially faulty on high rate
195 of hardlink creation.
196
197 Creating a tree of symlinks has a few drawbacks:
198 - Tasks running the equivalent of os.path.realpath() will get the naked path
199 and may fail.
200 - Windows:
201 - Symlinks are reparse points:
202 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
203 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
204 - Symbolic links are Win32 paths, not NT paths.
205 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
206 - Symbolic links are supported on Windows 7 and later only.
207 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
208 default.
209 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
210 RID is present in the token;
211 https://msdn.microsoft.com/en-us/library/bb530410.aspx
212 """
213 srcpath = fileobj_path(srcfileobj)
214 if srcpath and size == -1:
215 readonly = file_mode is None or (
216 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
217
218 if readonly:
219 # If the file is read only we can link the file
220 if use_symlink:
221 link_mode = file_path.SYMLINK_WITH_FALLBACK
222 else:
223 link_mode = file_path.HARDLINK_WITH_FALLBACK
224 else:
225 # If not read only, we must copy the file
226 link_mode = file_path.COPY
227
228 file_path.link_file(dstpath, srcpath, link_mode)
229 else:
230 # Need to write out the file
231 with fs.open(dstpath, 'wb') as dstfileobj:
232 fileobj_copy(dstfileobj, srcfileobj, size)
233
234 assert fs.exists(dstpath)
235
236 # file_mode of 0 is actually valid, so need explicit check.
237 if file_mode is not None:
238 fs.chmod(dstpath, file_mode)
239
240
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000241def zip_compress(content_generator, level=7):
242 """Reads chunks from |content_generator| and yields zip compressed chunks."""
243 compressor = zlib.compressobj(level)
244 for chunk in content_generator:
245 compressed = compressor.compress(chunk)
246 if compressed:
247 yield compressed
248 tail = compressor.flush(zlib.Z_FINISH)
249 if tail:
250 yield tail
251
252
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400253def zip_decompress(
254 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000255 """Reads zipped data from |content_generator| and yields decompressed data.
256
257 Decompresses data in small chunks (no larger than |chunk_size|) so that
258 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
259
260 Raises IOError if data is corrupted or incomplete.
261 """
262 decompressor = zlib.decompressobj()
263 compressed_size = 0
264 try:
265 for chunk in content_generator:
266 compressed_size += len(chunk)
267 data = decompressor.decompress(chunk, chunk_size)
268 if data:
269 yield data
270 while decompressor.unconsumed_tail:
271 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
272 if data:
273 yield data
274 tail = decompressor.flush()
275 if tail:
276 yield tail
277 except zlib.error as e:
278 raise IOError(
279 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
280 # Ensure all data was read and decompressed.
281 if decompressor.unused_data or decompressor.unconsumed_tail:
282 raise IOError('Not all data was decompressed')
283
284
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000285def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000286 """Given a filename calculates the ideal zip compression level to use."""
287 file_ext = os.path.splitext(filename)[1].lower()
288 # TODO(csharp): Profile to find what compression level works best.
289 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
290
291
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000292def create_directories(base_directory, files):
293 """Creates the directory structure needed by the given list of files."""
294 logging.debug('create_directories(%s, %d)', base_directory, len(files))
295 # Creates the tree of directories to create.
296 directories = set(os.path.dirname(f) for f in files)
297 for item in list(directories):
298 while item:
299 directories.add(item)
300 item = os.path.dirname(item)
301 for d in sorted(directories):
302 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700303 abs_d = os.path.join(base_directory, d)
304 if not fs.isdir(abs_d):
305 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000306
307
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000308def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500309 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000310 for filepath, properties in files:
311 if 'l' not in properties:
312 continue
313 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500314 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000315 logging.warning('Ignoring symlink %s', filepath)
316 continue
317 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700318 try:
319 os.symlink(properties['l'], outfile) # pylint: disable=E1101
320 except OSError as e:
321 if e.errno == errno.EEXIST:
322 raise AlreadyExists('File %s already exists.' % outfile)
323 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000324
325
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000326class _ThreadFile(object):
327 """Multithreaded fake file. Used by TarBundle."""
328 def __init__(self):
329 self._data = threading_utils.TaskChannel()
330 self._offset = 0
331
332 def __iter__(self):
333 return self._data
334
335 def tell(self):
336 return self._offset
337
338 def write(self, b):
339 self._data.send_result(b)
340 self._offset += len(b)
341
342 def close(self):
343 self._data.send_done()
344
345
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400346class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800347 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000348
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800349 Its digest and size may be provided in advance, if known. Otherwise they will
350 be derived from the file content.
351 """
352
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000353 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800354 super(FileItem, self).__init__(
355 digest,
maruel12e30012015-10-09 11:55:35 -0700356 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000357 high_priority,
358 compression_level=_get_zip_compression_level(path))
359 self._path = path
360 self._algo = algo
361 self._meta = None
362
363 @property
364 def path(self):
365 return self._path
366
367 @property
368 def digest(self):
369 if not self._digest:
370 self._digest = isolated_format.hash_file(self._path, self._algo)
371 return self._digest
372
373 @property
374 def meta(self):
375 if not self._meta:
376 # TODO(maruel): Inline.
377 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
378 # We need to hash right away.
379 self._meta['h'] = self.digest
380 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000386class TarBundle(isolate_storage.Item):
387 """Tarfile to push to Storage.
388
389 Its digest is the digest of all the files it contains. It is generated on the
390 fly.
391 """
392
393 def __init__(self, root, algo):
394 # 2 trailing 512 bytes headers.
395 super(TarBundle, self).__init__(size=1024)
396 self._items = []
397 self._meta = None
398 self._algo = algo
399 self._root_len = len(root) + 1
400 # Same value as for Go.
401 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
402 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
403 self._archive_max_size = int(10e6)
404
405 @property
406 def digest(self):
407 if not self._digest:
408 self._prepare()
409 return self._digest
410
411 @property
412 def size(self):
413 if self._size is None:
414 self._prepare()
415 return self._size
416
417 def try_add(self, item):
418 """Try to add this file to the bundle.
419
420 It is extremely naive but this should be just enough for
421 https://crbug.com/825418.
422
423 Future improvements should be in the Go code, and the Swarming bot should be
424 migrated to use the Go code instead.
425 """
426 if not item.size:
427 return False
428 # pylint: disable=unreachable
429 rounded = (item.size + 512) & ~511
430 if rounded + self._size > self._archive_max_size:
431 return False
432 # https://crbug.com/825418
433 return False
434 self._size += rounded
435 self._items.append(item)
436 return True
437
438 def yield_item_path_meta(self):
439 """Returns a tuple(Item, filepath, meta_dict).
440
441 If the bundle contains less than 5 items, the items are yielded.
442 """
443 if len(self._items) < 5:
444 # The tarball is too small, yield individual items, if any.
445 for item in self._items:
446 yield item, item.path[self._root_len:], item.meta
447 else:
448 # This ensures self._meta is set.
449 p = self.digest + '.tar'
450 # Yield itself as a tarball.
451 yield self, p, self._meta
452
453 def content(self):
454 """Generates the tarfile content on the fly."""
455 obj = _ThreadFile()
456 def _tar_thread():
457 try:
458 t = tarfile.open(
459 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
460 for item in self._items:
461 logging.info(' tarring %s', item.path)
462 t.add(item.path)
463 t.close()
464 except Exception:
465 logging.exception('Internal failure')
466 finally:
467 obj.close()
468
469 t = threading.Thread(target=_tar_thread)
470 t.start()
471 try:
472 for data in obj:
473 yield data
474 finally:
475 t.join()
476
477 def _prepare(self):
478 h = self._algo()
479 total = 0
480 for chunk in self.content():
481 h.update(chunk)
482 total += len(chunk)
483 # pylint: disable=attribute-defined-outside-init
484 # This is not true, they are defined in Item.__init__().
485 self._digest = h.hexdigest()
486 self._size = total
487 self._meta = {
488 'h': self.digest,
489 's': self.size,
490 't': u'tar',
491 }
492
493
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400494class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000495 """A byte buffer to push to Storage."""
496
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000497 def __init__(self, buf, algo, high_priority=False):
498 super(BufferItem, self).__init__(
499 digest=algo(buf).hexdigest(),
500 size=len(buf),
501 high_priority=high_priority)
502 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000503
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000505 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000506
507
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000508class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000510
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 Implements compression support, parallel 'contains' checks, parallel uploads
512 and more.
513
514 Works only within single namespace (and thus hashing algorithm and compression
515 scheme are fixed).
516
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400517 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
518 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800519 """
520
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700521 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000522 self._storage_api = storage_api
523 self._cpu_thread_pool = None
524 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400525 self._aborted = False
526 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000527
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000528 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000529 def server_ref(self):
530 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700531
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000532 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700533 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000534 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700535
536 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 def cpu_thread_pool(self):
538 """ThreadPool for CPU-bound tasks like zipping."""
539 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500540 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000541 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
542 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500543 # On 32 bits userland, do not try to use more than 16 threads.
544 threads = min(threads, 16)
545 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000546 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000547
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000548 @property
549 def net_thread_pool(self):
550 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
551 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700552 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000553 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000554
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000555 def close(self):
556 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400557 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000558 if self._cpu_thread_pool:
559 self._cpu_thread_pool.join()
560 self._cpu_thread_pool.close()
561 self._cpu_thread_pool = None
562 if self._net_thread_pool:
563 self._net_thread_pool.join()
564 self._net_thread_pool.close()
565 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400566 logging.info('Done.')
567
568 def abort(self):
569 """Cancels any pending or future operations."""
570 # This is not strictly theadsafe, but in the worst case the logging message
571 # will be printed twice. Not a big deal. In other places it is assumed that
572 # unprotected reads and writes to _aborted are serializable (it is true
573 # for python) and thus no locking is used.
574 if not self._aborted:
575 logging.warning('Aborting... It can take a while.')
576 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000577
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000578 def __enter__(self):
579 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400580 assert not self._prev_sig_handlers, self._prev_sig_handlers
581 for s in (signal.SIGINT, signal.SIGTERM):
582 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000583 return self
584
585 def __exit__(self, _exc_type, _exc_value, _traceback):
586 """Context manager interface."""
587 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400588 while self._prev_sig_handlers:
589 s, h = self._prev_sig_handlers.popitem()
590 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000591 return False
592
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000593 def upload_items(self, items):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000594 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000595
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800596 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000597
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000598 It uses 3 threads internally:
599 - One to create batches based on a timeout
600 - One to dispatch the /contains RPC and field the missing entries
601 - One to field the /push RPC
602
603 The main threads enumerates 'items' and pushes to the first thread. Then it
604 join() all the threads, waiting for them to complete.
605
606 (enumerate items of Item, this can be slow as disk is traversed)
607 |
608 v
609 _create_items_batches_thread Thread #1
610 (generates list(Item), every 3s or 20~100 items)
611 |
612 v
613 _do_lookups_thread Thread #2
614 | |
615 v v
616 (missing) (was on server)
617 |
618 v
619 _handle_missing_thread Thread #3
620 |
621 v
622 (upload Item, append to uploaded)
623
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000624 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400625 items: list of isolate_storage.Item instances that represents data to
626 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000627
628 Returns:
629 List of items that were uploaded. All other items are already there.
630 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000631 incoming = Queue.Queue()
632 batches_to_lookup = Queue.Queue()
633 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000634 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000636 def _create_items_batches_thread():
637 """Creates batches for /contains RPC lookup from individual items.
638
639 Input: incoming
640 Output: batches_to_lookup
641 """
642 try:
643 batch_size_index = 0
644 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
645 batch = []
646 while not self._aborted:
647 try:
648 item = incoming.get(True, timeout=3)
649 if item:
650 batch.append(item)
651 except Queue.Empty:
652 item = False
653 if len(batch) == batch_size or (not item and batch):
654 if len(batch) == batch_size:
655 batch_size_index += 1
656 batch_size = ITEMS_PER_CONTAINS_QUERIES[
657 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
658 batches_to_lookup.put(batch)
659 batch = []
660 if item is None:
661 break
662 finally:
663 # Unblock the next pipeline.
664 batches_to_lookup.put(None)
665
666 def _do_lookups_thread():
667 """Enqueues all the /contains RPCs and emits the missing items.
668
669 Input: batches_to_lookup
670 Output: missing, to_upload
671 """
672 try:
673 channel = threading_utils.TaskChannel()
674 def _contains(b):
675 if self._aborted:
676 raise Aborted()
677 return self._storage_api.contains(b)
678
679 pending_contains = 0
680 while not self._aborted:
681 batch = batches_to_lookup.get()
682 if batch is None:
683 break
684 self.net_thread_pool.add_task_with_channel(
685 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
686 pending_contains += 1
687 while pending_contains and not self._aborted:
688 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000689 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000690 except threading_utils.TaskChannel.Timeout:
691 break
692 pending_contains -= 1
693 for missing_item, push_state in v.iteritems():
694 missing.put((missing_item, push_state))
695 while pending_contains and not self._aborted:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000696 for missing_item, push_state in channel.next().iteritems():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000697 missing.put((missing_item, push_state))
698 pending_contains -= 1
699 finally:
700 # Unblock the next pipeline.
701 missing.put((None, None))
702
703 def _handle_missing_thread():
704 """Sends the missing items to the uploader.
705
706 Input: missing
707 Output: uploaded
708 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700709 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000710 channel = threading_utils.TaskChannel()
711 pending_upload = 0
712 while not self._aborted:
713 try:
714 missing_item, push_state = missing.get(True, timeout=5)
715 if missing_item is None:
716 break
717 self._async_push(channel, missing_item, push_state)
718 pending_upload += 1
719 except Queue.Empty:
720 pass
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000721 detector.ping()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000722 while not self._aborted and pending_upload:
723 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000724 item = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000725 except threading_utils.TaskChannel.Timeout:
726 break
727 uploaded.append(item)
728 pending_upload -= 1
729 logging.debug(
730 'Uploaded %d; %d pending: %s (%d)',
731 len(uploaded), pending_upload, item.digest, item.size)
732 while not self._aborted and pending_upload:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000733 item = channel.next()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000734 uploaded.append(item)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000735 pending_upload -= 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000736 logging.debug(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000737 'Uploaded %d; %d pending: %s (%d)',
738 len(uploaded), pending_upload, item.digest, item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000739
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000740 threads = [
741 threading.Thread(target=_create_items_batches_thread),
742 threading.Thread(target=_do_lookups_thread),
743 threading.Thread(target=_handle_missing_thread),
744 ]
745 for t in threads:
746 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000747
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000748 try:
749 # For each digest keep only first isolate_storage.Item that matches it.
750 # All other items are just indistinguishable copies from the point of view
751 # of isolate server (it doesn't care about paths at all, only content and
752 # digests).
753 seen = {}
754 try:
755 # TODO(maruel): Reorder the items as a priority queue, with larger items
756 # being processed first. This is, before hashing the data.
757 # This must be done in the primary thread since items can be a
758 # generator.
759 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000760 if seen.setdefault(item.digest, item) is item:
761 incoming.put(item)
762 finally:
763 incoming.put(None)
764 finally:
765 for t in threads:
766 t.join()
767
768 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000769 if seen:
770 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000771 return uploaded
772
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000773 def _async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000774 """Starts asynchronous push to the server in a parallel thread.
775
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000776 Can be used only after |item| was checked for presence on a server with a
777 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800778
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000779 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000780 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400781 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000782 push_state: push state returned by storage_api.contains(). It contains
783 storage specific information describing how to upload the item (for
784 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800785
786 Returns:
787 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000788 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800789 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400790 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700791 threading_utils.PRIORITY_HIGH if item.high_priority
792 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800793
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000794 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400795 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400796 if self._aborted:
797 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800798 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000799 return item
800
Wei Huang1a38fbe2017-11-28 22:55:22 -0500801 # If zipping is not required, just start a push task. Don't pass 'content'
802 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000803 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000804 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000805 return
806
807 # If zipping is enabled, zip in a separate thread.
808 def zip_and_push():
809 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
810 # content right here. It will block until all file is zipped.
811 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400812 if self._aborted:
813 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800814 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000815 data = ''.join(stream)
816 except Exception as exc:
817 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800818 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000819 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500820 # Pass '[data]' explicitly because the compressed data is not same as the
821 # one provided by 'item'. Since '[data]' is a list, it can safely be
822 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000823 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000824 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000825 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000826
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800827 def push(self, item, push_state):
828 """Synchronously pushes a single item to the server.
829
830 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000831 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800832
833 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400834 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000835 push_state: push state returned by storage_api.contains(). It contains
836 storage specific information describing how to upload the item (for
837 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800838
839 Returns:
840 Pushed item (same object as |item|).
841 """
842 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700843 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000844 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000845 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800846 assert pushed is item
847 return item
848
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000849 def async_fetch(self, channel, priority, digest, size, sink):
850 """Starts asynchronous fetch from the server in a parallel thread.
851
852 Arguments:
853 channel: TaskChannel that receives back |digest| when download ends.
854 priority: thread pool task priority for the fetch.
855 digest: hex digest of an item to download.
856 size: expected size of the item (after decompression).
857 sink: function that will be called as sink(generator).
858 """
859 def fetch():
860 try:
861 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400862 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000863 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400864 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000865 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000866 verifier = FetchStreamVerifier(
867 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000868 # Verified stream goes to |sink|.
869 sink(verifier.run())
870 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800871 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000872 raise
873 return digest
874
875 # Don't bother with zip_thread_pool for decompression. Decompression is
876 # really fast and most probably IO bound anyway.
877 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
878
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000879
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000880class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400881 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882
883 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400884 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
885 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000886 """
887
888 def __init__(self, storage, cache):
889 self.storage = storage
890 self.cache = cache
891 self._channel = threading_utils.TaskChannel()
892 self._pending = set()
893 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000894 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400895 # Pending digests that the caller waits for, see wait_on()/wait().
896 self._waiting_on = set()
897 # Already fetched digests the caller waits for which are not yet returned by
898 # wait().
899 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000900
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400901 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700902 self,
903 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400904 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700905 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000906 """Starts asynchronous fetch of item |digest|."""
907 # Fetching it now?
908 if digest in self._pending:
909 return
910
911 # Mark this file as in use, verify_all_cached will later ensure it is still
912 # in cache.
913 self._accessed.add(digest)
914
915 # Already fetched? Notify cache to update item's LRU position.
916 if digest in self._fetched:
917 # 'touch' returns True if item is in cache and not corrupted.
918 if self.cache.touch(digest, size):
919 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000920 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000921 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000922
923 # TODO(maruel): It should look at the free disk space, the current cache
924 # size and the size of the new item on every new item:
925 # - Trim the cache as more entries are listed when free disk space is low,
926 # otherwise if the amount of data downloaded during the run > free disk
927 # space, it'll crash.
928 # - Make sure there's enough free disk space to fit all dependencies of
929 # this run! If not, abort early.
930
931 # Start fetching.
932 self._pending.add(digest)
933 self.storage.async_fetch(
934 self._channel, priority, digest, size,
935 functools.partial(self.cache.write, digest))
936
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400937 def wait_on(self, digest):
938 """Updates digests to be waited on by 'wait'."""
939 # Calculate once the already fetched items. These will be retrieved first.
940 if digest in self._fetched:
941 self._waiting_on_ready.add(digest)
942 else:
943 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000944
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400945 def wait(self):
946 """Waits until any of waited-on items is retrieved.
947
948 Once this happens, it is remove from the waited-on set and returned.
949
950 This function is called in two waves. The first wave it is done for HIGH
951 priority items, the isolated files themselves. The second wave it is called
952 for all the files.
953
954 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000955 """
956 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400957 if self._waiting_on_ready:
958 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000959
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400960 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000961
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400962 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000963 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000964 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000965 self._pending.remove(digest)
966 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400967 if digest in self._waiting_on:
968 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000969 return digest
970
971 # Should never reach this point due to assert above.
972 raise RuntimeError('Impossible state')
973
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400974 @property
975 def wait_queue_empty(self):
976 """Returns True if there is no digest left for wait() to return."""
977 return not self._waiting_on and not self._waiting_on_ready
978
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000979 def inject_local_file(self, path, algo):
980 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700981 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000982 data = f.read()
983 digest = algo(data).hexdigest()
984 self.cache.write(digest, [data])
985 self._fetched.add(digest)
986 return digest
987
988 @property
989 def pending_count(self):
990 """Returns number of items to be fetched."""
991 return len(self._pending)
992
993 def verify_all_cached(self):
994 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000995 # Not thread safe, but called after all work is done.
996 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000997
998
999class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001000 """Verifies that fetched file is valid before passing it to the
1001 ContentAddressedCache.
1002 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001003
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001004 def __init__(self, stream, hasher, expected_digest, expected_size):
1005 """Initializes the verifier.
1006
1007 Arguments:
1008 * stream: an iterable yielding chunks of content
1009 * hasher: an object from hashlib that supports update() and hexdigest()
1010 (eg, hashlib.sha1).
1011 * expected_digest: if the entire stream is piped through hasher and then
1012 summarized via hexdigest(), this should be the result. That is, it
1013 should be a hex string like 'abc123'.
1014 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001015 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001016 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001017 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001018 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001019 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001020 self.expected_size = expected_size
1021 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001022 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001023
1024 def run(self):
1025 """Generator that yields same items as |stream|.
1026
1027 Verifies |stream| is complete before yielding a last chunk to consumer.
1028
1029 Also wraps IOError produced by consumer into MappingError exceptions since
1030 otherwise Storage will retry fetch on unrelated local cache errors.
1031 """
1032 # Read one chunk ahead, keep it in |stored|.
1033 # That way a complete stream can be verified before pushing last chunk
1034 # to consumer.
1035 stored = None
1036 for chunk in self.stream:
1037 assert chunk is not None
1038 if stored is not None:
1039 self._inspect_chunk(stored, is_last=False)
1040 try:
1041 yield stored
1042 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001043 raise isolated_format.MappingError(
1044 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001045 stored = chunk
1046 if stored is not None:
1047 self._inspect_chunk(stored, is_last=True)
1048 try:
1049 yield stored
1050 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001051 raise isolated_format.MappingError(
1052 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001053
1054 def _inspect_chunk(self, chunk, is_last):
1055 """Called for each fetched chunk before passing it to consumer."""
1056 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001057 self.rolling_hash.update(chunk)
1058 if not is_last:
1059 return
1060
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001061 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001062 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001063 msg = 'Incorrect file size: want %d, got %d' % (
1064 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001065 raise IOError(msg)
1066
1067 actual_digest = self.rolling_hash.hexdigest()
1068 if self.expected_digest != actual_digest:
1069 msg = 'Incorrect digest: want %s, got %s' % (
1070 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001071 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001072
1073
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001074class IsolatedBundle(object):
1075 """Fetched and parsed .isolated file with all dependencies."""
1076
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001077 def __init__(self, filter_cb):
1078 """
1079 filter_cb: callback function to filter downloaded content.
1080 When filter_cb is not None, Isolated file is downloaded iff
1081 filter_cb(filepath) returns True.
1082 """
1083
Vadim Shtayura3148e072014-09-02 18:51:52 -07001084 self.command = []
1085 self.files = {}
1086 self.read_only = None
1087 self.relative_cwd = None
1088 # The main .isolated file, a IsolatedFile instance.
1089 self.root = None
1090
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001091 self._filter_cb = filter_cb
1092
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001093 def fetch(self, fetch_queue, root_isolated_hash, algo):
1094 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001095
1096 It enables support for "included" .isolated files. They are processed in
1097 strict order but fetched asynchronously from the cache. This is important so
1098 that a file in an included .isolated file that is overridden by an embedding
1099 .isolated file is not fetched needlessly. The includes are fetched in one
1100 pass and the files are fetched as soon as all the ones on the left-side
1101 of the tree were fetched.
1102
1103 The prioritization is very important here for nested .isolated files.
1104 'includes' have the highest priority and the algorithm is optimized for both
1105 deep and wide trees. A deep one is a long link of .isolated files referenced
1106 one at a time by one item in 'includes'. A wide one has a large number of
1107 'includes' in a single .isolated file. 'left' is defined as an included
1108 .isolated file earlier in the 'includes' list. So the order of the elements
1109 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001110
1111 As a side effect this method starts asynchronous fetch of all data files
1112 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1113 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001114 """
1115 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1116
1117 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1118 pending = {}
1119 # Set of hashes of already retrieved items to refuse recursive includes.
1120 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001121 # Set of IsolatedFile's whose data files have already being fetched.
1122 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001123
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001124 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001125 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001126 h = isolated_file.obj_hash
1127 if h in seen:
1128 raise isolated_format.IsolatedError(
1129 'IsolatedFile %s is retrieved recursively' % h)
1130 assert h not in pending
1131 seen.add(h)
1132 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001133 # This isolated item is being added dynamically, notify FetchQueue.
1134 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001135 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1136
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001137 # Start fetching root *.isolated file (single file, not the whole bundle).
1138 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001139
1140 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001141 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001142 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001143 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001144 with fetch_queue.cache.getfileobj(item_hash) as f:
1145 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001146
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001147 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001148 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001149 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001150
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001151 # Always fetch *.isolated files in traversal order, waiting if necessary
1152 # until next to-be-processed node loads. "Waiting" is done by yielding
1153 # back to the outer loop, that waits until some *.isolated is loaded.
1154 for node in isolated_format.walk_includes(self.root):
1155 if node not in processed:
1156 # Not visited, and not yet loaded -> wait for it to load.
1157 if not node.is_loaded:
1158 break
1159 # Not visited and loaded -> process it and continue the traversal.
1160 self._start_fetching_files(node, fetch_queue)
1161 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001162
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001163 # All *.isolated files should be processed by now and only them.
1164 all_isolateds = set(isolated_format.walk_includes(self.root))
1165 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001166 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001167
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001168 # Extract 'command' and other bundle properties.
1169 for node in isolated_format.walk_includes(self.root):
1170 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001171 self.relative_cwd = self.relative_cwd or ''
1172
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001173 def _start_fetching_files(self, isolated, fetch_queue):
1174 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001175
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001176 Modifies self.files.
1177 """
maruel10bea7b2016-12-07 05:03:49 -08001178 files = isolated.data.get('files', {})
1179 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1180 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001181 if self._filter_cb and not self._filter_cb(filepath):
1182 continue
1183
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001184 # Root isolated has priority on the files being mapped. In particular,
1185 # overridden files must not be fetched.
1186 if filepath not in self.files:
1187 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001188
1189 # Make sure if the isolated is read only, the mode doesn't have write
1190 # bits.
1191 if 'm' in properties and self.read_only:
1192 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1193
1194 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001195 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001196 fetch_queue.add(
1197 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1198
1199 def _update_self(self, node):
1200 """Extracts bundle global parameters from loaded *.isolated file.
1201
1202 Will be called with each loaded *.isolated file in order of traversal of
1203 isolated include graph (see isolated_format.walk_includes).
1204 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001205 # Grabs properties.
1206 if not self.command and node.data.get('command'):
1207 # Ensure paths are correctly separated on windows.
1208 self.command = node.data['command']
1209 if self.command:
1210 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001211 if self.read_only is None and node.data.get('read_only') is not None:
1212 self.read_only = node.data['read_only']
1213 if (self.relative_cwd is None and
1214 node.data.get('relative_cwd') is not None):
1215 self.relative_cwd = node.data['relative_cwd']
1216
1217
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001218def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001219 """Returns Storage class that can upload and download from |namespace|.
1220
1221 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001222 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001223
1224 Returns:
1225 Instance of Storage.
1226 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001227 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1228 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001229
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001230
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001231def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1232 """Put downloaded file to destination path. This function is used for multi
1233 threaded file putting.
1234 """
1235 with cache.getfileobj(digest) as srcfileobj:
1236 filetype = props.get('t', 'basic')
1237
1238 if filetype == 'basic':
1239 # Ignore all bits apart from the user.
Lei Leife202df2019-06-11 17:33:34 +00001240 file_mode = (props.get('m') or 0o500) & 0o700
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001241 if read_only:
1242 # Enforce read-only if the root bundle does.
Lei Leife202df2019-06-11 17:33:34 +00001243 file_mode &= 0o500
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001244 putfile(srcfileobj, dst, file_mode,
1245 use_symlink=use_symlinks)
1246
1247 elif filetype == 'tar':
1248 basedir = os.path.dirname(dst)
1249 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1250 for ti in t:
1251 if not ti.isfile():
1252 logging.warning(
1253 'Path(%r) is nonfile (%s), skipped',
1254 ti.name, ti.type)
1255 continue
1256 # Handle files created on Windows fetched on POSIX and the
1257 # reverse.
1258 other_sep = '/' if os.path.sep == '\\' else '\\'
1259 name = ti.name.replace(other_sep, os.path.sep)
1260 fp = os.path.normpath(os.path.join(basedir, name))
1261 if not fp.startswith(basedir):
1262 logging.error(
1263 'Path(%r) is outside root directory',
1264 fp)
1265 ifd = t.extractfile(ti)
1266 file_path.ensure_tree(os.path.dirname(fp))
Lei Leife202df2019-06-11 17:33:34 +00001267 file_mode = ti.mode & 0o700
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001268 if read_only:
1269 # Enforce read-only if the root bundle does.
Lei Leife202df2019-06-11 17:33:34 +00001270 file_mode &= 0o500
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001271 putfile(ifd, fp, file_mode, ti.size)
1272
1273 else:
1274 raise isolated_format.IsolatedError(
1275 'Unknown file type %r' % filetype)
1276
1277
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001278def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1279 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001280 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001281
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001282 Arguments:
1283 isolated_hash: hash of the root *.isolated file.
1284 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001285 cache: ContentAddressedCache class that knows how to store and map files
1286 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001287 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001288 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001289 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001290
1291 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001292 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001293 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001294 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001295 'fetch_isolated(%s, %s, %s, %s, %s)',
1296 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001297 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001298 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001299 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001300 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001301
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001302 with tools.Profiler('GetIsolateds'):
1303 # Optionally support local files by manually adding them to cache.
1304 if not isolated_format.is_valid_hash(isolated_hash, algo):
1305 logging.debug('%s is not a valid hash, assuming a file '
1306 '(algo was %s, hash size was %d)',
1307 isolated_hash, algo(), algo().digest_size)
1308 path = unicode(os.path.abspath(isolated_hash))
1309 try:
1310 isolated_hash = fetch_queue.inject_local_file(path, algo)
1311 except IOError as e:
1312 raise isolated_format.MappingError(
1313 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1314 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001315
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001316 # Load all *.isolated and start loading rest of the files.
1317 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001318
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001319 with tools.Profiler('GetRest'):
1320 # Create file system hierarchy.
1321 file_path.ensure_tree(outdir)
1322 create_directories(outdir, bundle.files)
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +00001323 _create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001324
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001325 # Ensure working directory exists.
1326 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1327 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001328
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001329 # Multimap: digest -> list of pairs (path, props).
1330 remaining = {}
1331 for filepath, props in bundle.files.iteritems():
1332 if 'h' in props:
1333 remaining.setdefault(props['h'], []).append((filepath, props))
1334 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001335
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001336 # Now block on the remaining files to be downloaded and mapped.
1337 logging.info('Retrieving remaining files (%d of them)...',
1338 fetch_queue.pending_count)
1339 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001340
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001341 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1342 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1343 while remaining:
1344 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001345
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001346 # Wait for any item to finish fetching to cache.
1347 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001348
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001349 # Create the files in the destination using item in cache as the
1350 # source.
1351 for filepath, props in remaining.pop(digest):
1352 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001353
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001354 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1355 _map_file, fullpath, digest,
1356 props, cache, bundle.read_only,
1357 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001358
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001359 # Report progress.
1360 duration = time.time() - last_update
1361 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1362 msg = '%d files remaining...' % len(remaining)
1363 sys.stdout.write(msg + '\n')
1364 sys.stdout.flush()
1365 logging.info(msg)
1366 last_update = time.time()
1367 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1368 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001369
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001370 # Save the cache right away to not loose the state of the new objects.
1371 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001372 # Cache could evict some items we just tried to fetch, it's a fatal error.
1373 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001374 free_disk = file_path.get_free_space(cache.cache_dir)
1375 msg = (
1376 'Cache is too small to hold all requested files.\n'
1377 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001378 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001379 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001380 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001381
1382
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001383def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001384 """Yields every file and/or symlink found.
1385
1386 Yields:
1387 tuple(FileItem, relpath, metadata)
1388 For a symlink, FileItem is None.
1389 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001390 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001391 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001392 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001393 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001394 root,
1395 u'.' + os.path.sep,
1396 blacklist,
1397 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001398
1399 filepath = os.path.join(root, relpath)
1400 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001401 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001402 meta = isolated_format.file_to_metadata(filepath, 0, False)
1403 yield None, relpath, meta
1404 continue
1405
1406 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001407 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1408 # The file was added to the current pending tarball and won't be archived
1409 # individually.
1410 continue
1411
1412 # Flush and reset the bundle.
1413 for i, p, m in bundle.yield_item_path_meta():
1414 yield i, p, m
1415 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001416
1417 # Yield the file individually.
1418 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1419 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001420
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001421 for i, p, m in bundle.yield_item_path_meta():
1422 yield i, p, m
1423
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001424
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001425def _print_upload_stats(items, missing):
1426 """Prints upload stats."""
1427 total = len(items)
1428 total_size = sum(f.size for f in items)
1429 logging.info(
1430 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1431 cache_hit = set(items).difference(missing)
1432 cache_hit_size = sum(f.size for f in cache_hit)
1433 logging.info(
1434 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1435 len(cache_hit),
1436 cache_hit_size / 1024.,
1437 len(cache_hit) * 100. / total,
1438 cache_hit_size * 100. / total_size if total_size else 0)
1439 cache_miss = missing
1440 cache_miss_size = sum(f.size for f in cache_miss)
1441 logging.info(
1442 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1443 len(cache_miss),
1444 cache_miss_size / 1024.,
1445 len(cache_miss) * 100. / total,
1446 cache_miss_size * 100. / total_size if total_size else 0)
1447
1448
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001449def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001450 """Called by archive_files_to_storage for a directory.
1451
1452 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001453
1454 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001455 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001456 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001457 files = {}
1458 for item, relpath, meta in _directory_to_metadata(
1459 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001460 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001461 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001462 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001463 yield item
1464
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001465 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001466 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001467 'algo': hash_algo_name,
1468 'files': files,
1469 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001470 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001471 # Keep the file in memory. This is fine because .isolated files are relatively
1472 # small.
1473 yield BufferItem(
1474 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001475
1476
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001477def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001478 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001479
1480 Arguments:
1481 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001482 files: iterable of files to upload. If a directory is specified (with a
1483 trailing slash), a .isolated file is created and its hash is returned.
1484 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001485 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001486
1487 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001488 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001489 The first file in the first item is always the .isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001490 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001491 # Dict of path to hash.
1492 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001493 hash_algo = storage.server_ref.hash_algo
1494 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001495 # Generator of FileItem to pass to upload_items() concurrent operation.
1496 channel = threading_utils.TaskChannel()
1497 uploaded_digests = set()
1498 def _upload_items():
1499 results = storage.upload_items(channel)
1500 uploaded_digests.update(f.digest for f in results)
1501 t = threading.Thread(target=_upload_items)
1502 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001503
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001504 # Keep track locally of the items to determine cold and hot items.
1505 items_found = []
1506 try:
1507 for f in files:
1508 assert isinstance(f, unicode), repr(f)
1509 if f in results:
1510 # Duplicate
1511 continue
1512 try:
1513 filepath = os.path.abspath(f)
1514 if fs.isdir(filepath):
1515 # Uploading a whole directory.
1516 item = None
1517 for item in _enqueue_dir(
1518 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001519 channel.send_result(item)
1520 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001521 # The very last item will be the .isolated file.
1522 if not item:
1523 # There was no file in the directory.
1524 continue
1525 elif fs.isfile(filepath):
1526 item = FileItem(
1527 path=filepath,
1528 algo=hash_algo,
1529 size=None,
1530 high_priority=f.endswith('.isolated'))
1531 channel.send_result(item)
1532 items_found.append(item)
1533 else:
1534 raise Error('%s is neither a file or directory.' % f)
1535 results[f] = item.digest
1536 except OSError:
1537 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001538 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001539 # Stops the generator, so _upload_items() can exit.
1540 channel.send_done()
1541 t.join()
1542
1543 cold = []
1544 hot = []
1545 for i in items_found:
1546 # Note that multiple FileItem may have the same .digest.
1547 if i.digest in uploaded_digests:
1548 cold.append(i)
1549 else:
1550 hot.append(i)
1551 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001552
1553
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001554@subcommand.usage('<file1..fileN> or - to read from stdin')
1555def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001556 """Archives data to the server.
1557
1558 If a directory is specified, a .isolated file is created the whole directory
1559 is uploaded. Then this .isolated file can be included in another one to run
1560 commands.
1561
1562 The commands output each file that was processed with its content hash. For
1563 directories, the .isolated generated for the directory is listed as the
1564 directory entry itself.
1565 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001566 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001567 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001568 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001569 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001570 server_ref = isolate_storage.ServerRef(
1571 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001572 if files == ['-']:
1573 files = (l.rstrip('\n\r') for l in sys.stdin)
1574 if not files:
1575 parser.error('Nothing to upload')
1576 files = (f.decode('utf-8') for f in files)
1577 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001578 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001579 with get_storage(server_ref) as storage:
1580 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001581 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001582 parser.error(e.args[0])
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001583 print('\n'.join('%s %s' % (h, f) for f, h in results.iteritems()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001584 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001585
1586
1587def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001588 """Download data from the server.
1589
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001590 It can either download individual files or a complete tree from a .isolated
1591 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001592 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001593 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001594 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001595 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001596 help='hash of an isolated file, .isolated file content is discarded, use '
1597 '--file if you need it')
1598 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001599 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1600 help='hash and destination of a file, can be used multiple times')
1601 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001602 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001603 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001604 parser.add_option(
1605 '--use-symlinks', action='store_true',
1606 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001607 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001608 options, args = parser.parse_args(args)
1609 if args:
1610 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001611 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001612 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001613
nodir55be77b2016-05-03 09:39:57 -07001614 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001615 if bool(options.isolated) == bool(options.file):
1616 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001617 if not options.cache and options.use_symlinks:
1618 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001619
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001620 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001621 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001622 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001623 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001624 if (fs.isfile(options.target) or
1625 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001626 parser.error(
1627 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001628 server_ref = isolate_storage.ServerRef(
1629 options.isolate_server, options.namespace)
1630 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001631 # Fetching individual files.
1632 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001633 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001634 channel = threading_utils.TaskChannel()
1635 pending = {}
1636 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001637 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001638 pending[digest] = dest
1639 storage.async_fetch(
1640 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001641 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001642 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001643 local_caching.UNKNOWN_FILE_SIZE,
1644 functools.partial(
1645 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001646 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001647 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001648 dest = pending.pop(fetched)
1649 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001650
Vadim Shtayura3172be52013-12-03 12:49:05 -08001651 # Fetching whole isolated tree.
1652 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001653 bundle = fetch_isolated(
1654 isolated_hash=options.isolated,
1655 storage=storage,
1656 cache=cache,
1657 outdir=options.target,
1658 use_symlinks=options.use_symlinks)
1659 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001660 if bundle.command:
1661 rel = os.path.join(options.target, bundle.relative_cwd)
1662 print('To run this test please run from the directory %s:' %
1663 os.path.join(options.target, rel))
1664 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001665
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001666 return 0
1667
1668
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001669def add_archive_options(parser):
1670 parser.add_option(
1671 '--blacklist',
1672 action='append', default=list(DEFAULT_BLACKLIST),
1673 help='List of regexp to use as blacklist filter when uploading '
1674 'directories')
1675
1676
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001677def add_isolate_server_options(parser):
1678 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001679 parser.add_option(
1680 '-I', '--isolate-server',
1681 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001682 help='URL of the Isolate Server to use. Defaults to the environment '
1683 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1684 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001685 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001686 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001687 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001688 '--namespace', default='default-gzip',
1689 help='The namespace to use on the Isolate Server, default: %default')
1690
1691
nodir55be77b2016-05-03 09:39:57 -07001692def process_isolate_server_options(
1693 parser, options, set_exception_handler, required):
1694 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001695
1696 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001697 """
1698 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001699 if required:
1700 parser.error('--isolate-server is required.')
1701 return
1702
aludwind7b7b7e2017-06-29 16:38:50 -07001703 if options.grpc_proxy:
1704 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001705 else:
1706 try:
1707 options.isolate_server = net.fix_url(options.isolate_server)
1708 except ValueError as e:
1709 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001710 if set_exception_handler:
1711 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001712 try:
1713 return auth.ensure_logged_in(options.isolate_server)
1714 except ValueError as e:
1715 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001716 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001717
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001718
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001719def add_cache_options(parser):
1720 cache_group = optparse.OptionGroup(parser, 'Cache management')
1721 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001722 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001723 help='Directory to keep a local cache of the files. Accelerates download '
1724 'by reusing already downloaded files. Default=%default')
1725 cache_group.add_option(
1726 '--max-cache-size',
1727 type='int',
1728 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001729 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001730 help='Trim if the cache gets larger than this value, default=%default')
1731 cache_group.add_option(
1732 '--min-free-space',
1733 type='int',
1734 metavar='NNN',
1735 default=2*1024*1024*1024,
1736 help='Trim if disk free space becomes lower than this value, '
1737 'default=%default')
1738 cache_group.add_option(
1739 '--max-items',
1740 type='int',
1741 metavar='NNN',
1742 default=100000,
1743 help='Trim if more than this number of items are in the cache '
1744 'default=%default')
1745 parser.add_option_group(cache_group)
1746
1747
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001748def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001749 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001750 policies = local_caching.CachePolicies(
1751 options.max_cache_size,
1752 options.min_free_space,
1753 options.max_items,
1754 # 3 weeks.
1755 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001756
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001757 # |options.cache| path may not exist until DiskContentAddressedCache()
1758 # instance is created.
1759 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel79d42192019-02-06 19:24:16 +00001760 unicode(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001761 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001762
1763
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001764class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001765 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001766 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001767 self,
1768 version=__version__,
1769 prog=os.path.basename(sys.modules[__name__].__file__),
1770 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001771 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001772
1773 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001774 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001775 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001776 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001777 return options, args
1778
1779
1780def main(args):
1781 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001782 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001783
1784
1785if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001786 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001787 fix_encoding.fix_encoding()
1788 tools.disable_buffering()
1789 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001790 sys.exit(main(sys.argv[1:]))