blob: a51891406fb7ed94fcd9be46d5588e02ca719261 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000016import Queue
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
tansell26de79e2016-11-13 18:41:11 -080021import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000022import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
33
34# pylint: disable=ungrouped-imports
35import auth
36import isolated_format
37import isolate_storage
38import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050039from utils import file_path
maruel12e30012015-10-09 11:55:35 -070040from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040041from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000042from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040043from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070044from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000045from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080046
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# Version of isolate protocol passed to the server in /handshake request.
49ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000050
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000051
Vadim Shtayura3148e072014-09-02 18:51:52 -070052# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
maruel@chromium.org41601642013-09-18 19:40:46 +000080# The delay (in seconds) to wait between logging statements when retrieving
81# the required files. This is intended to let the user (or buildbot) know that
82# the program is still running.
83DELAY_BETWEEN_UPDATES_IN_SECS = 30
84
85
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050086DEFAULT_BLACKLIST = (
87 # Temporary vim or python files.
88 r'^.+\.(?:pyc|swp)$',
89 # .git or .svn directory.
90 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
91)
92
93
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050094class Error(Exception):
95 """Generic runtime error."""
96 pass
97
98
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040099class Aborted(Error):
100 """Operation aborted."""
101 pass
102
103
nodir90bc8dc2016-06-15 13:35:21 -0700104class AlreadyExists(Error):
105 """File already exists."""
106
107
maruel12e30012015-10-09 11:55:35 -0700108def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800109 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700110 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 if offset:
112 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000113 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000114 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 if not data:
116 break
117 yield data
118
119
tansell9e04a8d2016-07-28 09:31:59 -0700120def fileobj_path(fileobj):
121 """Return file system path for file like object or None.
122
123 The returned path is guaranteed to exist and can be passed to file system
124 operations like copy.
125 """
126 name = getattr(fileobj, 'name', None)
127 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000128 return None
tansell9e04a8d2016-07-28 09:31:59 -0700129
130 # If the file like object was created using something like open("test.txt")
131 # name will end up being a str (such as a function outside our control, like
132 # the standard library). We want all our paths to be unicode objects, so we
133 # decode it.
134 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500135 # We incorrectly assume that UTF-8 is used everywhere.
136 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700137
tansell26de79e2016-11-13 18:41:11 -0800138 # fs.exists requires an absolute path, otherwise it will fail with an
139 # assertion error.
140 if not os.path.isabs(name):
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000141 return None
tansell26de79e2016-11-13 18:41:11 -0800142
tansell9e04a8d2016-07-28 09:31:59 -0700143 if fs.exists(name):
144 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000145 return None
tansell9e04a8d2016-07-28 09:31:59 -0700146
147
148# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
149# wrappers have been created.
150def fileobj_copy(
151 dstfileobj, srcfileobj, size=-1,
152 chunk_size=isolated_format.DISK_FILE_CHUNK):
153 """Copy data from srcfileobj to dstfileobj.
154
155 Providing size means exactly that amount of data will be copied (if there
156 isn't enough data, an IOError exception is thrown). Otherwise all data until
157 the EOF marker will be copied.
158 """
159 if size == -1 and hasattr(srcfileobj, 'tell'):
160 if srcfileobj.tell() != 0:
161 raise IOError('partial file but not using size')
162
163 written = 0
164 while written != size:
165 readsize = chunk_size
166 if size > 0:
167 readsize = min(readsize, size-written)
168 data = srcfileobj.read(readsize)
169 if not data:
170 if size == -1:
171 break
172 raise IOError('partial file, got %s, wanted %s' % (written, size))
173 dstfileobj.write(data)
174 written += len(data)
175
176
177def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
178 """Put srcfileobj at the given dstpath with given mode.
179
180 The function aims to do this as efficiently as possible while still allowing
181 any possible file like object be given.
182
183 Creating a tree of hardlinks has a few drawbacks:
184 - tmpfs cannot be used for the scratch space. The tree has to be on the same
185 partition as the cache.
186 - involves a write to the inode, which advances ctime, cause a metadata
187 writeback (causing disk seeking).
188 - cache ctime cannot be used to detect modifications / corruption.
189 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
190 partition. This is why the function automatically fallbacks to copying the
191 file content.
192 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
193 same owner is for all hardlinks.
194 - Anecdotal report that ext2 is known to be potentially faulty on high rate
195 of hardlink creation.
196
197 Creating a tree of symlinks has a few drawbacks:
198 - Tasks running the equivalent of os.path.realpath() will get the naked path
199 and may fail.
200 - Windows:
201 - Symlinks are reparse points:
202 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
203 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
204 - Symbolic links are Win32 paths, not NT paths.
205 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
206 - Symbolic links are supported on Windows 7 and later only.
207 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
208 default.
209 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
210 RID is present in the token;
211 https://msdn.microsoft.com/en-us/library/bb530410.aspx
212 """
213 srcpath = fileobj_path(srcfileobj)
214 if srcpath and size == -1:
215 readonly = file_mode is None or (
216 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
217
218 if readonly:
219 # If the file is read only we can link the file
220 if use_symlink:
221 link_mode = file_path.SYMLINK_WITH_FALLBACK
222 else:
223 link_mode = file_path.HARDLINK_WITH_FALLBACK
224 else:
225 # If not read only, we must copy the file
226 link_mode = file_path.COPY
227
228 file_path.link_file(dstpath, srcpath, link_mode)
229 else:
230 # Need to write out the file
231 with fs.open(dstpath, 'wb') as dstfileobj:
232 fileobj_copy(dstfileobj, srcfileobj, size)
233
234 assert fs.exists(dstpath)
235
236 # file_mode of 0 is actually valid, so need explicit check.
237 if file_mode is not None:
238 fs.chmod(dstpath, file_mode)
239
240
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000241def zip_compress(content_generator, level=7):
242 """Reads chunks from |content_generator| and yields zip compressed chunks."""
243 compressor = zlib.compressobj(level)
244 for chunk in content_generator:
245 compressed = compressor.compress(chunk)
246 if compressed:
247 yield compressed
248 tail = compressor.flush(zlib.Z_FINISH)
249 if tail:
250 yield tail
251
252
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400253def zip_decompress(
254 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000255 """Reads zipped data from |content_generator| and yields decompressed data.
256
257 Decompresses data in small chunks (no larger than |chunk_size|) so that
258 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
259
260 Raises IOError if data is corrupted or incomplete.
261 """
262 decompressor = zlib.decompressobj()
263 compressed_size = 0
264 try:
265 for chunk in content_generator:
266 compressed_size += len(chunk)
267 data = decompressor.decompress(chunk, chunk_size)
268 if data:
269 yield data
270 while decompressor.unconsumed_tail:
271 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
272 if data:
273 yield data
274 tail = decompressor.flush()
275 if tail:
276 yield tail
277 except zlib.error as e:
278 raise IOError(
279 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
280 # Ensure all data was read and decompressed.
281 if decompressor.unused_data or decompressor.unconsumed_tail:
282 raise IOError('Not all data was decompressed')
283
284
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000285def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000286 """Given a filename calculates the ideal zip compression level to use."""
287 file_ext = os.path.splitext(filename)[1].lower()
288 # TODO(csharp): Profile to find what compression level works best.
289 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
290
291
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000292def create_directories(base_directory, files):
293 """Creates the directory structure needed by the given list of files."""
294 logging.debug('create_directories(%s, %d)', base_directory, len(files))
295 # Creates the tree of directories to create.
296 directories = set(os.path.dirname(f) for f in files)
297 for item in list(directories):
298 while item:
299 directories.add(item)
300 item = os.path.dirname(item)
301 for d in sorted(directories):
302 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700303 abs_d = os.path.join(base_directory, d)
304 if not fs.isdir(abs_d):
305 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000306
307
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000308def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500309 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000310 for filepath, properties in files:
311 if 'l' not in properties:
312 continue
313 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500314 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000315 logging.warning('Ignoring symlink %s', filepath)
316 continue
317 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700318 try:
319 os.symlink(properties['l'], outfile) # pylint: disable=E1101
320 except OSError as e:
321 if e.errno == errno.EEXIST:
322 raise AlreadyExists('File %s already exists.' % outfile)
323 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000324
325
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000326class _ThreadFile(object):
327 """Multithreaded fake file. Used by TarBundle."""
328 def __init__(self):
329 self._data = threading_utils.TaskChannel()
330 self._offset = 0
331
332 def __iter__(self):
333 return self._data
334
335 def tell(self):
336 return self._offset
337
338 def write(self, b):
339 self._data.send_result(b)
340 self._offset += len(b)
341
342 def close(self):
343 self._data.send_done()
344
345
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400346class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800347 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000348
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800349 Its digest and size may be provided in advance, if known. Otherwise they will
350 be derived from the file content.
351 """
352
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000353 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800354 super(FileItem, self).__init__(
355 digest,
maruel12e30012015-10-09 11:55:35 -0700356 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000357 high_priority,
358 compression_level=_get_zip_compression_level(path))
359 self._path = path
360 self._algo = algo
361 self._meta = None
362
363 @property
364 def path(self):
365 return self._path
366
367 @property
368 def digest(self):
369 if not self._digest:
370 self._digest = isolated_format.hash_file(self._path, self._algo)
371 return self._digest
372
373 @property
374 def meta(self):
375 if not self._meta:
376 # TODO(maruel): Inline.
377 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
378 # We need to hash right away.
379 self._meta['h'] = self.digest
380 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000381
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800382 def content(self):
383 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000384
385
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000386class TarBundle(isolate_storage.Item):
387 """Tarfile to push to Storage.
388
389 Its digest is the digest of all the files it contains. It is generated on the
390 fly.
391 """
392
393 def __init__(self, root, algo):
394 # 2 trailing 512 bytes headers.
395 super(TarBundle, self).__init__(size=1024)
396 self._items = []
397 self._meta = None
398 self._algo = algo
399 self._root_len = len(root) + 1
400 # Same value as for Go.
401 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
402 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
403 self._archive_max_size = int(10e6)
404
405 @property
406 def digest(self):
407 if not self._digest:
408 self._prepare()
409 return self._digest
410
411 @property
412 def size(self):
413 if self._size is None:
414 self._prepare()
415 return self._size
416
417 def try_add(self, item):
418 """Try to add this file to the bundle.
419
420 It is extremely naive but this should be just enough for
421 https://crbug.com/825418.
422
423 Future improvements should be in the Go code, and the Swarming bot should be
424 migrated to use the Go code instead.
425 """
426 if not item.size:
427 return False
428 # pylint: disable=unreachable
429 rounded = (item.size + 512) & ~511
430 if rounded + self._size > self._archive_max_size:
431 return False
432 # https://crbug.com/825418
433 return False
434 self._size += rounded
435 self._items.append(item)
436 return True
437
438 def yield_item_path_meta(self):
439 """Returns a tuple(Item, filepath, meta_dict).
440
441 If the bundle contains less than 5 items, the items are yielded.
442 """
443 if len(self._items) < 5:
444 # The tarball is too small, yield individual items, if any.
445 for item in self._items:
446 yield item, item.path[self._root_len:], item.meta
447 else:
448 # This ensures self._meta is set.
449 p = self.digest + '.tar'
450 # Yield itself as a tarball.
451 yield self, p, self._meta
452
453 def content(self):
454 """Generates the tarfile content on the fly."""
455 obj = _ThreadFile()
456 def _tar_thread():
457 try:
458 t = tarfile.open(
459 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
460 for item in self._items:
461 logging.info(' tarring %s', item.path)
462 t.add(item.path)
463 t.close()
464 except Exception:
465 logging.exception('Internal failure')
466 finally:
467 obj.close()
468
469 t = threading.Thread(target=_tar_thread)
470 t.start()
471 try:
472 for data in obj:
473 yield data
474 finally:
475 t.join()
476
477 def _prepare(self):
478 h = self._algo()
479 total = 0
480 for chunk in self.content():
481 h.update(chunk)
482 total += len(chunk)
483 # pylint: disable=attribute-defined-outside-init
484 # This is not true, they are defined in Item.__init__().
485 self._digest = h.hexdigest()
486 self._size = total
487 self._meta = {
488 'h': self.digest,
489 's': self.size,
490 't': u'tar',
491 }
492
493
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400494class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000495 """A byte buffer to push to Storage."""
496
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000497 def __init__(self, buf, algo, high_priority=False):
498 super(BufferItem, self).__init__(
499 digest=algo(buf).hexdigest(),
500 size=len(buf),
501 high_priority=high_priority)
502 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000503
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800504 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000505 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000506
507
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000508class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000510
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 Implements compression support, parallel 'contains' checks, parallel uploads
512 and more.
513
514 Works only within single namespace (and thus hashing algorithm and compression
515 scheme are fixed).
516
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400517 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
518 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800519 """
520
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700521 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000522 self._storage_api = storage_api
523 self._cpu_thread_pool = None
524 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400525 self._aborted = False
526 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000527
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000528 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000529 def server_ref(self):
530 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700531
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000532 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700533 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000534 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700535
536 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000537 def cpu_thread_pool(self):
538 """ThreadPool for CPU-bound tasks like zipping."""
539 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500540 threads = max(threading_utils.num_processors(), 2)
541 if sys.maxsize <= 2L**32:
542 # On 32 bits userland, do not try to use more than 16 threads.
543 threads = min(threads, 16)
544 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000545 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000546
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000547 @property
548 def net_thread_pool(self):
549 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
550 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700551 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000553
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 def close(self):
555 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400556 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000557 if self._cpu_thread_pool:
558 self._cpu_thread_pool.join()
559 self._cpu_thread_pool.close()
560 self._cpu_thread_pool = None
561 if self._net_thread_pool:
562 self._net_thread_pool.join()
563 self._net_thread_pool.close()
564 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400565 logging.info('Done.')
566
567 def abort(self):
568 """Cancels any pending or future operations."""
569 # This is not strictly theadsafe, but in the worst case the logging message
570 # will be printed twice. Not a big deal. In other places it is assumed that
571 # unprotected reads and writes to _aborted are serializable (it is true
572 # for python) and thus no locking is used.
573 if not self._aborted:
574 logging.warning('Aborting... It can take a while.')
575 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000576
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000577 def __enter__(self):
578 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400579 assert not self._prev_sig_handlers, self._prev_sig_handlers
580 for s in (signal.SIGINT, signal.SIGTERM):
581 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000582 return self
583
584 def __exit__(self, _exc_type, _exc_value, _traceback):
585 """Context manager interface."""
586 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400587 while self._prev_sig_handlers:
588 s, h = self._prev_sig_handlers.popitem()
589 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000590 return False
591
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000592 def upload_items(self, items):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000593 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000594
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800595 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000596
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000597 It uses 3 threads internally:
598 - One to create batches based on a timeout
599 - One to dispatch the /contains RPC and field the missing entries
600 - One to field the /push RPC
601
602 The main threads enumerates 'items' and pushes to the first thread. Then it
603 join() all the threads, waiting for them to complete.
604
605 (enumerate items of Item, this can be slow as disk is traversed)
606 |
607 v
608 _create_items_batches_thread Thread #1
609 (generates list(Item), every 3s or 20~100 items)
610 |
611 v
612 _do_lookups_thread Thread #2
613 | |
614 v v
615 (missing) (was on server)
616 |
617 v
618 _handle_missing_thread Thread #3
619 |
620 v
621 (upload Item, append to uploaded)
622
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000623 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400624 items: list of isolate_storage.Item instances that represents data to
625 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000626
627 Returns:
628 List of items that were uploaded. All other items are already there.
629 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000630 incoming = Queue.Queue()
631 batches_to_lookup = Queue.Queue()
632 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000633 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000635 def _create_items_batches_thread():
636 """Creates batches for /contains RPC lookup from individual items.
637
638 Input: incoming
639 Output: batches_to_lookup
640 """
641 try:
642 batch_size_index = 0
643 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
644 batch = []
645 while not self._aborted:
646 try:
647 item = incoming.get(True, timeout=3)
648 if item:
649 batch.append(item)
650 except Queue.Empty:
651 item = False
652 if len(batch) == batch_size or (not item and batch):
653 if len(batch) == batch_size:
654 batch_size_index += 1
655 batch_size = ITEMS_PER_CONTAINS_QUERIES[
656 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
657 batches_to_lookup.put(batch)
658 batch = []
659 if item is None:
660 break
661 finally:
662 # Unblock the next pipeline.
663 batches_to_lookup.put(None)
664
665 def _do_lookups_thread():
666 """Enqueues all the /contains RPCs and emits the missing items.
667
668 Input: batches_to_lookup
669 Output: missing, to_upload
670 """
671 try:
672 channel = threading_utils.TaskChannel()
673 def _contains(b):
674 if self._aborted:
675 raise Aborted()
676 return self._storage_api.contains(b)
677
678 pending_contains = 0
679 while not self._aborted:
680 batch = batches_to_lookup.get()
681 if batch is None:
682 break
683 self.net_thread_pool.add_task_with_channel(
684 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
685 pending_contains += 1
686 while pending_contains and not self._aborted:
687 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000688 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000689 except threading_utils.TaskChannel.Timeout:
690 break
691 pending_contains -= 1
692 for missing_item, push_state in v.iteritems():
693 missing.put((missing_item, push_state))
694 while pending_contains and not self._aborted:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000695 for missing_item, push_state in channel.next().iteritems():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000696 missing.put((missing_item, push_state))
697 pending_contains -= 1
698 finally:
699 # Unblock the next pipeline.
700 missing.put((None, None))
701
702 def _handle_missing_thread():
703 """Sends the missing items to the uploader.
704
705 Input: missing
706 Output: uploaded
707 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700708 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000709 channel = threading_utils.TaskChannel()
710 pending_upload = 0
711 while not self._aborted:
712 try:
713 missing_item, push_state = missing.get(True, timeout=5)
714 if missing_item is None:
715 break
716 self._async_push(channel, missing_item, push_state)
717 pending_upload += 1
718 except Queue.Empty:
719 pass
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000720 detector.ping()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000721 while not self._aborted and pending_upload:
722 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000723 item = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000724 except threading_utils.TaskChannel.Timeout:
725 break
726 uploaded.append(item)
727 pending_upload -= 1
728 logging.debug(
729 'Uploaded %d; %d pending: %s (%d)',
730 len(uploaded), pending_upload, item.digest, item.size)
731 while not self._aborted and pending_upload:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000732 item = channel.next()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000733 uploaded.append(item)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000734 pending_upload -= 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000735 logging.debug(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000736 'Uploaded %d; %d pending: %s (%d)',
737 len(uploaded), pending_upload, item.digest, item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000738
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000739 threads = [
740 threading.Thread(target=_create_items_batches_thread),
741 threading.Thread(target=_do_lookups_thread),
742 threading.Thread(target=_handle_missing_thread),
743 ]
744 for t in threads:
745 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000746
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000747 try:
748 # For each digest keep only first isolate_storage.Item that matches it.
749 # All other items are just indistinguishable copies from the point of view
750 # of isolate server (it doesn't care about paths at all, only content and
751 # digests).
752 seen = {}
753 try:
754 # TODO(maruel): Reorder the items as a priority queue, with larger items
755 # being processed first. This is, before hashing the data.
756 # This must be done in the primary thread since items can be a
757 # generator.
758 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000759 if seen.setdefault(item.digest, item) is item:
760 incoming.put(item)
761 finally:
762 incoming.put(None)
763 finally:
764 for t in threads:
765 t.join()
766
767 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000768 if seen:
769 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000770 return uploaded
771
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000772 def _async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000773 """Starts asynchronous push to the server in a parallel thread.
774
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000775 Can be used only after |item| was checked for presence on a server with a
776 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800777
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000778 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000779 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400780 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000781 push_state: push state returned by storage_api.contains(). It contains
782 storage specific information describing how to upload the item (for
783 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800784
785 Returns:
786 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000787 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800788 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400789 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700790 threading_utils.PRIORITY_HIGH if item.high_priority
791 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800792
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000793 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400794 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400795 if self._aborted:
796 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800797 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000798 return item
799
Wei Huang1a38fbe2017-11-28 22:55:22 -0500800 # If zipping is not required, just start a push task. Don't pass 'content'
801 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000802 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000803 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000804 return
805
806 # If zipping is enabled, zip in a separate thread.
807 def zip_and_push():
808 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
809 # content right here. It will block until all file is zipped.
810 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400811 if self._aborted:
812 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800813 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000814 data = ''.join(stream)
815 except Exception as exc:
816 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800817 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000818 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500819 # Pass '[data]' explicitly because the compressed data is not same as the
820 # one provided by 'item'. Since '[data]' is a list, it can safely be
821 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000823 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000824 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000825
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800826 def push(self, item, push_state):
827 """Synchronously pushes a single item to the server.
828
829 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000830 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800831
832 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400833 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000834 push_state: push state returned by storage_api.contains(). It contains
835 storage specific information describing how to upload the item (for
836 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800837
838 Returns:
839 Pushed item (same object as |item|).
840 """
841 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700842 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000843 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000844 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800845 assert pushed is item
846 return item
847
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000848 def async_fetch(self, channel, priority, digest, size, sink):
849 """Starts asynchronous fetch from the server in a parallel thread.
850
851 Arguments:
852 channel: TaskChannel that receives back |digest| when download ends.
853 priority: thread pool task priority for the fetch.
854 digest: hex digest of an item to download.
855 size: expected size of the item (after decompression).
856 sink: function that will be called as sink(generator).
857 """
858 def fetch():
859 try:
860 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400861 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000862 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400863 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000864 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000865 verifier = FetchStreamVerifier(
866 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000867 # Verified stream goes to |sink|.
868 sink(verifier.run())
869 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800870 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000871 raise
872 return digest
873
874 # Don't bother with zip_thread_pool for decompression. Decompression is
875 # really fast and most probably IO bound anyway.
876 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
877
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000878
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000879class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400880 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000881
882 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400883 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
884 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000885 """
886
887 def __init__(self, storage, cache):
888 self.storage = storage
889 self.cache = cache
890 self._channel = threading_utils.TaskChannel()
891 self._pending = set()
892 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000893 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400894 # Pending digests that the caller waits for, see wait_on()/wait().
895 self._waiting_on = set()
896 # Already fetched digests the caller waits for which are not yet returned by
897 # wait().
898 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000899
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400900 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700901 self,
902 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400903 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700904 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000905 """Starts asynchronous fetch of item |digest|."""
906 # Fetching it now?
907 if digest in self._pending:
908 return
909
910 # Mark this file as in use, verify_all_cached will later ensure it is still
911 # in cache.
912 self._accessed.add(digest)
913
914 # Already fetched? Notify cache to update item's LRU position.
915 if digest in self._fetched:
916 # 'touch' returns True if item is in cache and not corrupted.
917 if self.cache.touch(digest, size):
918 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000919 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000920 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000921
922 # TODO(maruel): It should look at the free disk space, the current cache
923 # size and the size of the new item on every new item:
924 # - Trim the cache as more entries are listed when free disk space is low,
925 # otherwise if the amount of data downloaded during the run > free disk
926 # space, it'll crash.
927 # - Make sure there's enough free disk space to fit all dependencies of
928 # this run! If not, abort early.
929
930 # Start fetching.
931 self._pending.add(digest)
932 self.storage.async_fetch(
933 self._channel, priority, digest, size,
934 functools.partial(self.cache.write, digest))
935
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400936 def wait_on(self, digest):
937 """Updates digests to be waited on by 'wait'."""
938 # Calculate once the already fetched items. These will be retrieved first.
939 if digest in self._fetched:
940 self._waiting_on_ready.add(digest)
941 else:
942 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000943
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400944 def wait(self):
945 """Waits until any of waited-on items is retrieved.
946
947 Once this happens, it is remove from the waited-on set and returned.
948
949 This function is called in two waves. The first wave it is done for HIGH
950 priority items, the isolated files themselves. The second wave it is called
951 for all the files.
952
953 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000954 """
955 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400956 if self._waiting_on_ready:
957 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000958
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400959 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000960
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400961 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000962 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000963 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000964 self._pending.remove(digest)
965 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400966 if digest in self._waiting_on:
967 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000968 return digest
969
970 # Should never reach this point due to assert above.
971 raise RuntimeError('Impossible state')
972
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400973 @property
974 def wait_queue_empty(self):
975 """Returns True if there is no digest left for wait() to return."""
976 return not self._waiting_on and not self._waiting_on_ready
977
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000978 def inject_local_file(self, path, algo):
979 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700980 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000981 data = f.read()
982 digest = algo(data).hexdigest()
983 self.cache.write(digest, [data])
984 self._fetched.add(digest)
985 return digest
986
987 @property
988 def pending_count(self):
989 """Returns number of items to be fetched."""
990 return len(self._pending)
991
992 def verify_all_cached(self):
993 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000994 # Not thread safe, but called after all work is done.
995 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000996
997
998class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400999 """Verifies that fetched file is valid before passing it to the
1000 ContentAddressedCache.
1001 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001002
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001003 def __init__(self, stream, hasher, expected_digest, expected_size):
1004 """Initializes the verifier.
1005
1006 Arguments:
1007 * stream: an iterable yielding chunks of content
1008 * hasher: an object from hashlib that supports update() and hexdigest()
1009 (eg, hashlib.sha1).
1010 * expected_digest: if the entire stream is piped through hasher and then
1011 summarized via hexdigest(), this should be the result. That is, it
1012 should be a hex string like 'abc123'.
1013 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001014 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001015 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001016 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001017 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001018 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001019 self.expected_size = expected_size
1020 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001021 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001022
1023 def run(self):
1024 """Generator that yields same items as |stream|.
1025
1026 Verifies |stream| is complete before yielding a last chunk to consumer.
1027
1028 Also wraps IOError produced by consumer into MappingError exceptions since
1029 otherwise Storage will retry fetch on unrelated local cache errors.
1030 """
1031 # Read one chunk ahead, keep it in |stored|.
1032 # That way a complete stream can be verified before pushing last chunk
1033 # to consumer.
1034 stored = None
1035 for chunk in self.stream:
1036 assert chunk is not None
1037 if stored is not None:
1038 self._inspect_chunk(stored, is_last=False)
1039 try:
1040 yield stored
1041 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001042 raise isolated_format.MappingError(
1043 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001044 stored = chunk
1045 if stored is not None:
1046 self._inspect_chunk(stored, is_last=True)
1047 try:
1048 yield stored
1049 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001050 raise isolated_format.MappingError(
1051 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001052
1053 def _inspect_chunk(self, chunk, is_last):
1054 """Called for each fetched chunk before passing it to consumer."""
1055 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001056 self.rolling_hash.update(chunk)
1057 if not is_last:
1058 return
1059
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001060 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001061 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001062 msg = 'Incorrect file size: want %d, got %d' % (
1063 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001064 raise IOError(msg)
1065
1066 actual_digest = self.rolling_hash.hexdigest()
1067 if self.expected_digest != actual_digest:
1068 msg = 'Incorrect digest: want %s, got %s' % (
1069 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001070 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001071
1072
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001073class IsolatedBundle(object):
1074 """Fetched and parsed .isolated file with all dependencies."""
1075
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001076 def __init__(self, filter_cb):
1077 """
1078 filter_cb: callback function to filter downloaded content.
1079 When filter_cb is not None, Isolated file is downloaded iff
1080 filter_cb(filepath) returns True.
1081 """
1082
Vadim Shtayura3148e072014-09-02 18:51:52 -07001083 self.command = []
1084 self.files = {}
1085 self.read_only = None
1086 self.relative_cwd = None
1087 # The main .isolated file, a IsolatedFile instance.
1088 self.root = None
1089
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001090 self._filter_cb = filter_cb
1091
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001092 def fetch(self, fetch_queue, root_isolated_hash, algo):
1093 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001094
1095 It enables support for "included" .isolated files. They are processed in
1096 strict order but fetched asynchronously from the cache. This is important so
1097 that a file in an included .isolated file that is overridden by an embedding
1098 .isolated file is not fetched needlessly. The includes are fetched in one
1099 pass and the files are fetched as soon as all the ones on the left-side
1100 of the tree were fetched.
1101
1102 The prioritization is very important here for nested .isolated files.
1103 'includes' have the highest priority and the algorithm is optimized for both
1104 deep and wide trees. A deep one is a long link of .isolated files referenced
1105 one at a time by one item in 'includes'. A wide one has a large number of
1106 'includes' in a single .isolated file. 'left' is defined as an included
1107 .isolated file earlier in the 'includes' list. So the order of the elements
1108 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001109
1110 As a side effect this method starts asynchronous fetch of all data files
1111 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1112 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001113 """
1114 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1115
1116 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1117 pending = {}
1118 # Set of hashes of already retrieved items to refuse recursive includes.
1119 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001120 # Set of IsolatedFile's whose data files have already being fetched.
1121 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001122
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001123 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001124 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001125 h = isolated_file.obj_hash
1126 if h in seen:
1127 raise isolated_format.IsolatedError(
1128 'IsolatedFile %s is retrieved recursively' % h)
1129 assert h not in pending
1130 seen.add(h)
1131 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001132 # This isolated item is being added dynamically, notify FetchQueue.
1133 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001134 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1135
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001136 # Start fetching root *.isolated file (single file, not the whole bundle).
1137 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001138
1139 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001140 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001141 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001142 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001143 with fetch_queue.cache.getfileobj(item_hash) as f:
1144 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001145
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001146 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001147 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001148 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001149
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001150 # Always fetch *.isolated files in traversal order, waiting if necessary
1151 # until next to-be-processed node loads. "Waiting" is done by yielding
1152 # back to the outer loop, that waits until some *.isolated is loaded.
1153 for node in isolated_format.walk_includes(self.root):
1154 if node not in processed:
1155 # Not visited, and not yet loaded -> wait for it to load.
1156 if not node.is_loaded:
1157 break
1158 # Not visited and loaded -> process it and continue the traversal.
1159 self._start_fetching_files(node, fetch_queue)
1160 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001161
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001162 # All *.isolated files should be processed by now and only them.
1163 all_isolateds = set(isolated_format.walk_includes(self.root))
1164 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001165 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001166
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001167 # Extract 'command' and other bundle properties.
1168 for node in isolated_format.walk_includes(self.root):
1169 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001170 self.relative_cwd = self.relative_cwd or ''
1171
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001172 def _start_fetching_files(self, isolated, fetch_queue):
1173 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001174
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001175 Modifies self.files.
1176 """
maruel10bea7b2016-12-07 05:03:49 -08001177 files = isolated.data.get('files', {})
1178 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1179 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001180 if self._filter_cb and not self._filter_cb(filepath):
1181 continue
1182
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001183 # Root isolated has priority on the files being mapped. In particular,
1184 # overridden files must not be fetched.
1185 if filepath not in self.files:
1186 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001187
1188 # Make sure if the isolated is read only, the mode doesn't have write
1189 # bits.
1190 if 'm' in properties and self.read_only:
1191 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1192
1193 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001194 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001195 fetch_queue.add(
1196 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1197
1198 def _update_self(self, node):
1199 """Extracts bundle global parameters from loaded *.isolated file.
1200
1201 Will be called with each loaded *.isolated file in order of traversal of
1202 isolated include graph (see isolated_format.walk_includes).
1203 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001204 # Grabs properties.
1205 if not self.command and node.data.get('command'):
1206 # Ensure paths are correctly separated on windows.
1207 self.command = node.data['command']
1208 if self.command:
1209 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001210 if self.read_only is None and node.data.get('read_only') is not None:
1211 self.read_only = node.data['read_only']
1212 if (self.relative_cwd is None and
1213 node.data.get('relative_cwd') is not None):
1214 self.relative_cwd = node.data['relative_cwd']
1215
1216
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001217def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001218 """Returns Storage class that can upload and download from |namespace|.
1219
1220 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001221 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001222
1223 Returns:
1224 Instance of Storage.
1225 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001226 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1227 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001228
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001229
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001230def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1231 """Put downloaded file to destination path. This function is used for multi
1232 threaded file putting.
1233 """
1234 with cache.getfileobj(digest) as srcfileobj:
1235 filetype = props.get('t', 'basic')
1236
1237 if filetype == 'basic':
1238 # Ignore all bits apart from the user.
1239 file_mode = (props.get('m') or 0500) & 0700
1240 if read_only:
1241 # Enforce read-only if the root bundle does.
1242 file_mode &= 0500
1243 putfile(srcfileobj, dst, file_mode,
1244 use_symlink=use_symlinks)
1245
1246 elif filetype == 'tar':
1247 basedir = os.path.dirname(dst)
1248 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1249 for ti in t:
1250 if not ti.isfile():
1251 logging.warning(
1252 'Path(%r) is nonfile (%s), skipped',
1253 ti.name, ti.type)
1254 continue
1255 # Handle files created on Windows fetched on POSIX and the
1256 # reverse.
1257 other_sep = '/' if os.path.sep == '\\' else '\\'
1258 name = ti.name.replace(other_sep, os.path.sep)
1259 fp = os.path.normpath(os.path.join(basedir, name))
1260 if not fp.startswith(basedir):
1261 logging.error(
1262 'Path(%r) is outside root directory',
1263 fp)
1264 ifd = t.extractfile(ti)
1265 file_path.ensure_tree(os.path.dirname(fp))
1266 file_mode = ti.mode & 0700
1267 if read_only:
1268 # Enforce read-only if the root bundle does.
1269 file_mode &= 0500
1270 putfile(ifd, fp, file_mode, ti.size)
1271
1272 else:
1273 raise isolated_format.IsolatedError(
1274 'Unknown file type %r' % filetype)
1275
1276
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001277def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1278 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001279 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001280
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001281 Arguments:
1282 isolated_hash: hash of the root *.isolated file.
1283 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001284 cache: ContentAddressedCache class that knows how to store and map files
1285 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001286 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001287 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001288 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001289
1290 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001291 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001292 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001293 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001294 'fetch_isolated(%s, %s, %s, %s, %s)',
1295 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001296 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001297 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001298 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001299 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001300
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001301 with tools.Profiler('GetIsolateds'):
1302 # Optionally support local files by manually adding them to cache.
1303 if not isolated_format.is_valid_hash(isolated_hash, algo):
1304 logging.debug('%s is not a valid hash, assuming a file '
1305 '(algo was %s, hash size was %d)',
1306 isolated_hash, algo(), algo().digest_size)
1307 path = unicode(os.path.abspath(isolated_hash))
1308 try:
1309 isolated_hash = fetch_queue.inject_local_file(path, algo)
1310 except IOError as e:
1311 raise isolated_format.MappingError(
1312 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1313 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001314
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001315 # Load all *.isolated and start loading rest of the files.
1316 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001317
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001318 with tools.Profiler('GetRest'):
1319 # Create file system hierarchy.
1320 file_path.ensure_tree(outdir)
1321 create_directories(outdir, bundle.files)
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +00001322 _create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001323
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001324 # Ensure working directory exists.
1325 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1326 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001327
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001328 # Multimap: digest -> list of pairs (path, props).
1329 remaining = {}
1330 for filepath, props in bundle.files.iteritems():
1331 if 'h' in props:
1332 remaining.setdefault(props['h'], []).append((filepath, props))
1333 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001334
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001335 # Now block on the remaining files to be downloaded and mapped.
1336 logging.info('Retrieving remaining files (%d of them)...',
1337 fetch_queue.pending_count)
1338 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001339
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001340 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1341 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1342 while remaining:
1343 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001344
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001345 # Wait for any item to finish fetching to cache.
1346 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001347
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001348 # Create the files in the destination using item in cache as the
1349 # source.
1350 for filepath, props in remaining.pop(digest):
1351 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001352
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001353 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1354 _map_file, fullpath, digest,
1355 props, cache, bundle.read_only,
1356 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001357
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001358 # Report progress.
1359 duration = time.time() - last_update
1360 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1361 msg = '%d files remaining...' % len(remaining)
1362 sys.stdout.write(msg + '\n')
1363 sys.stdout.flush()
1364 logging.info(msg)
1365 last_update = time.time()
1366 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1367 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001368
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001369 # Save the cache right away to not loose the state of the new objects.
1370 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001371 # Cache could evict some items we just tried to fetch, it's a fatal error.
1372 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001373 free_disk = file_path.get_free_space(cache.cache_dir)
1374 msg = (
1375 'Cache is too small to hold all requested files.\n'
1376 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001377 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001378 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001379 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001380
1381
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001382def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001383 """Yields every file and/or symlink found.
1384
1385 Yields:
1386 tuple(FileItem, relpath, metadata)
1387 For a symlink, FileItem is None.
1388 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001389 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001390 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001391 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001392 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001393 root,
1394 u'.' + os.path.sep,
1395 blacklist,
1396 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001397
1398 filepath = os.path.join(root, relpath)
1399 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001400 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001401 meta = isolated_format.file_to_metadata(filepath, 0, False)
1402 yield None, relpath, meta
1403 continue
1404
1405 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001406 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1407 # The file was added to the current pending tarball and won't be archived
1408 # individually.
1409 continue
1410
1411 # Flush and reset the bundle.
1412 for i, p, m in bundle.yield_item_path_meta():
1413 yield i, p, m
1414 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001415
1416 # Yield the file individually.
1417 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1418 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001419
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001420 for i, p, m in bundle.yield_item_path_meta():
1421 yield i, p, m
1422
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001423
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001424def _print_upload_stats(items, missing):
1425 """Prints upload stats."""
1426 total = len(items)
1427 total_size = sum(f.size for f in items)
1428 logging.info(
1429 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1430 cache_hit = set(items).difference(missing)
1431 cache_hit_size = sum(f.size for f in cache_hit)
1432 logging.info(
1433 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1434 len(cache_hit),
1435 cache_hit_size / 1024.,
1436 len(cache_hit) * 100. / total,
1437 cache_hit_size * 100. / total_size if total_size else 0)
1438 cache_miss = missing
1439 cache_miss_size = sum(f.size for f in cache_miss)
1440 logging.info(
1441 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1442 len(cache_miss),
1443 cache_miss_size / 1024.,
1444 len(cache_miss) * 100. / total,
1445 cache_miss_size * 100. / total_size if total_size else 0)
1446
1447
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001448def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001449 """Called by archive_files_to_storage for a directory.
1450
1451 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001452
1453 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001454 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001455 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001456 files = {}
1457 for item, relpath, meta in _directory_to_metadata(
1458 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001459 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001460 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001461 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001462 yield item
1463
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001464 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001465 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001466 'algo': hash_algo_name,
1467 'files': files,
1468 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001469 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001470 # Keep the file in memory. This is fine because .isolated files are relatively
1471 # small.
1472 yield BufferItem(
1473 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001474
1475
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001476def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001477 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001478
1479 Arguments:
1480 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001481 files: iterable of files to upload. If a directory is specified (with a
1482 trailing slash), a .isolated file is created and its hash is returned.
1483 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001484 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001485
1486 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001487 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001488 The first file in the first item is always the .isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001489 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001490 # Dict of path to hash.
1491 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001492 hash_algo = storage.server_ref.hash_algo
1493 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001494 # Generator of FileItem to pass to upload_items() concurrent operation.
1495 channel = threading_utils.TaskChannel()
1496 uploaded_digests = set()
1497 def _upload_items():
1498 results = storage.upload_items(channel)
1499 uploaded_digests.update(f.digest for f in results)
1500 t = threading.Thread(target=_upload_items)
1501 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001502
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001503 # Keep track locally of the items to determine cold and hot items.
1504 items_found = []
1505 try:
1506 for f in files:
1507 assert isinstance(f, unicode), repr(f)
1508 if f in results:
1509 # Duplicate
1510 continue
1511 try:
1512 filepath = os.path.abspath(f)
1513 if fs.isdir(filepath):
1514 # Uploading a whole directory.
1515 item = None
1516 for item in _enqueue_dir(
1517 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001518 channel.send_result(item)
1519 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001520 # The very last item will be the .isolated file.
1521 if not item:
1522 # There was no file in the directory.
1523 continue
1524 elif fs.isfile(filepath):
1525 item = FileItem(
1526 path=filepath,
1527 algo=hash_algo,
1528 size=None,
1529 high_priority=f.endswith('.isolated'))
1530 channel.send_result(item)
1531 items_found.append(item)
1532 else:
1533 raise Error('%s is neither a file or directory.' % f)
1534 results[f] = item.digest
1535 except OSError:
1536 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001537 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001538 # Stops the generator, so _upload_items() can exit.
1539 channel.send_done()
1540 t.join()
1541
1542 cold = []
1543 hot = []
1544 for i in items_found:
1545 # Note that multiple FileItem may have the same .digest.
1546 if i.digest in uploaded_digests:
1547 cold.append(i)
1548 else:
1549 hot.append(i)
1550 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001551
1552
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001553@subcommand.usage('<file1..fileN> or - to read from stdin')
1554def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001555 """Archives data to the server.
1556
1557 If a directory is specified, a .isolated file is created the whole directory
1558 is uploaded. Then this .isolated file can be included in another one to run
1559 commands.
1560
1561 The commands output each file that was processed with its content hash. For
1562 directories, the .isolated generated for the directory is listed as the
1563 directory entry itself.
1564 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001565 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001566 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001567 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001568 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001569 server_ref = isolate_storage.ServerRef(
1570 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001571 if files == ['-']:
1572 files = (l.rstrip('\n\r') for l in sys.stdin)
1573 if not files:
1574 parser.error('Nothing to upload')
1575 files = (f.decode('utf-8') for f in files)
1576 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001577 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001578 with get_storage(server_ref) as storage:
1579 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001580 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001581 parser.error(e.args[0])
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001582 print('\n'.join('%s %s' % (h, f) for f, h in results.iteritems()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001583 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001584
1585
1586def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001587 """Download data from the server.
1588
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001589 It can either download individual files or a complete tree from a .isolated
1590 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001591 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001592 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001593 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001594 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001595 help='hash of an isolated file, .isolated file content is discarded, use '
1596 '--file if you need it')
1597 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001598 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1599 help='hash and destination of a file, can be used multiple times')
1600 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001601 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001602 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001603 parser.add_option(
1604 '--use-symlinks', action='store_true',
1605 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001606 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001607 options, args = parser.parse_args(args)
1608 if args:
1609 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001610 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001611 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001612
nodir55be77b2016-05-03 09:39:57 -07001613 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001614 if bool(options.isolated) == bool(options.file):
1615 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001616 if not options.cache and options.use_symlinks:
1617 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001618
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001619 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001620 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001621 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001622 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001623 if (fs.isfile(options.target) or
1624 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001625 parser.error(
1626 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001627 server_ref = isolate_storage.ServerRef(
1628 options.isolate_server, options.namespace)
1629 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001630 # Fetching individual files.
1631 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001632 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001633 channel = threading_utils.TaskChannel()
1634 pending = {}
1635 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001636 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001637 pending[digest] = dest
1638 storage.async_fetch(
1639 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001640 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001641 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001642 local_caching.UNKNOWN_FILE_SIZE,
1643 functools.partial(
1644 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001645 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001646 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001647 dest = pending.pop(fetched)
1648 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001649
Vadim Shtayura3172be52013-12-03 12:49:05 -08001650 # Fetching whole isolated tree.
1651 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001652 bundle = fetch_isolated(
1653 isolated_hash=options.isolated,
1654 storage=storage,
1655 cache=cache,
1656 outdir=options.target,
1657 use_symlinks=options.use_symlinks)
1658 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001659 if bundle.command:
1660 rel = os.path.join(options.target, bundle.relative_cwd)
1661 print('To run this test please run from the directory %s:' %
1662 os.path.join(options.target, rel))
1663 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001664
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001665 return 0
1666
1667
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001668def add_archive_options(parser):
1669 parser.add_option(
1670 '--blacklist',
1671 action='append', default=list(DEFAULT_BLACKLIST),
1672 help='List of regexp to use as blacklist filter when uploading '
1673 'directories')
1674
1675
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001676def add_isolate_server_options(parser):
1677 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001678 parser.add_option(
1679 '-I', '--isolate-server',
1680 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001681 help='URL of the Isolate Server to use. Defaults to the environment '
1682 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1683 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001684 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001685 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001686 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001687 '--namespace', default='default-gzip',
1688 help='The namespace to use on the Isolate Server, default: %default')
1689
1690
nodir55be77b2016-05-03 09:39:57 -07001691def process_isolate_server_options(
1692 parser, options, set_exception_handler, required):
1693 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001694
1695 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001696 """
1697 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001698 if required:
1699 parser.error('--isolate-server is required.')
1700 return
1701
aludwind7b7b7e2017-06-29 16:38:50 -07001702 if options.grpc_proxy:
1703 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001704 else:
1705 try:
1706 options.isolate_server = net.fix_url(options.isolate_server)
1707 except ValueError as e:
1708 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001709 if set_exception_handler:
1710 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001711 try:
1712 return auth.ensure_logged_in(options.isolate_server)
1713 except ValueError as e:
1714 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001715 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001716
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001717
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001718def add_cache_options(parser):
1719 cache_group = optparse.OptionGroup(parser, 'Cache management')
1720 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001721 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001722 help='Directory to keep a local cache of the files. Accelerates download '
1723 'by reusing already downloaded files. Default=%default')
1724 cache_group.add_option(
1725 '--max-cache-size',
1726 type='int',
1727 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001728 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001729 help='Trim if the cache gets larger than this value, default=%default')
1730 cache_group.add_option(
1731 '--min-free-space',
1732 type='int',
1733 metavar='NNN',
1734 default=2*1024*1024*1024,
1735 help='Trim if disk free space becomes lower than this value, '
1736 'default=%default')
1737 cache_group.add_option(
1738 '--max-items',
1739 type='int',
1740 metavar='NNN',
1741 default=100000,
1742 help='Trim if more than this number of items are in the cache '
1743 'default=%default')
1744 parser.add_option_group(cache_group)
1745
1746
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001747def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001748 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001749 policies = local_caching.CachePolicies(
1750 options.max_cache_size,
1751 options.min_free_space,
1752 options.max_items,
1753 # 3 weeks.
1754 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001755
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001756 # |options.cache| path may not exist until DiskContentAddressedCache()
1757 # instance is created.
1758 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel79d42192019-02-06 19:24:16 +00001759 unicode(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001760 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001761
1762
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001763class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001764 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001765 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001766 self,
1767 version=__version__,
1768 prog=os.path.basename(sys.modules[__name__].__file__),
1769 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001770 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001771
1772 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001773 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001774 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001775 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001776 return options, args
1777
1778
1779def main(args):
1780 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001781 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001782
1783
1784if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001785 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001786 fix_encoding.fix_encoding()
1787 tools.disable_buffering()
1788 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001789 sys.exit(main(sys.argv[1:]))