blob: 899a5b48f8bbbd9647b491d023259a51182f55de [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000025from utils import tools
26tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000028# third_party/
29import colorama
30from depot_tools import fix_encoding
31from depot_tools import subcommand
Takuto Ikuta6e2ff962019-10-29 12:35:27 +000032import six
Lei Leife202df2019-06-11 17:33:34 +000033from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000034
35# pylint: disable=ungrouped-imports
36import auth
37import isolated_format
38import isolate_storage
39import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050040from utils import file_path
maruel12e30012015-10-09 11:55:35 -070041from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040042from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000043from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040044from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070045from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000046from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080047
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049# Version of isolate protocol passed to the server in /handshake request.
50ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000051
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000052
Vadim Shtayura3148e072014-09-02 18:51:52 -070053# Maximum expected delay (in seconds) between successive file fetches or uploads
54# in Storage. If it takes longer than that, a deadlock might be happening
55# and all stack frames for all threads are dumped to log.
56DEADLOCK_TIMEOUT = 5 * 60
57
58
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000059# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000060# All files are sorted by likelihood of a change in the file content
61# (currently file size is used to estimate this: larger the file -> larger the
62# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000063# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000064# and so on. Numbers here is a trade-off; the more per request, the lower the
65# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
66# larger values cause longer lookups, increasing the initial latency to start
67# uploading, which is especially an issue for large files. This value is
68# optimized for the "few thousands files to look up with minimal number of large
69# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040070ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000071
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000072
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000073# A list of already compressed extension types that should not receive any
74# compression before being uploaded.
75ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040076 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
77 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000078]
79
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000080
maruel@chromium.org41601642013-09-18 19:40:46 +000081# The delay (in seconds) to wait between logging statements when retrieving
82# the required files. This is intended to let the user (or buildbot) know that
83# the program is still running.
84DELAY_BETWEEN_UPDATES_IN_SECS = 30
85
86
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050087DEFAULT_BLACKLIST = (
88 # Temporary vim or python files.
89 r'^.+\.(?:pyc|swp)$',
90 # .git or .svn directory.
91 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
92)
93
94
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050095class Error(Exception):
96 """Generic runtime error."""
97 pass
98
99
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400100class Aborted(Error):
101 """Operation aborted."""
102 pass
103
104
nodir90bc8dc2016-06-15 13:35:21 -0700105class AlreadyExists(Error):
106 """File already exists."""
107
108
maruel12e30012015-10-09 11:55:35 -0700109def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800110 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700111 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800112 if offset:
113 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000114 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000115 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000116 if not data:
117 break
118 yield data
119
120
tansell9e04a8d2016-07-28 09:31:59 -0700121def fileobj_path(fileobj):
122 """Return file system path for file like object or None.
123
124 The returned path is guaranteed to exist and can be passed to file system
125 operations like copy.
126 """
127 name = getattr(fileobj, 'name', None)
128 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000129 return None
tansell9e04a8d2016-07-28 09:31:59 -0700130
131 # If the file like object was created using something like open("test.txt")
132 # name will end up being a str (such as a function outside our control, like
133 # the standard library). We want all our paths to be unicode objects, so we
134 # decode it.
Takuto Ikuta95459dd2019-10-29 12:39:47 +0000135 if not isinstance(name, six.text_type):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500136 # We incorrectly assume that UTF-8 is used everywhere.
137 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700138
tansell26de79e2016-11-13 18:41:11 -0800139 # fs.exists requires an absolute path, otherwise it will fail with an
140 # assertion error.
141 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000142 return None
tansell26de79e2016-11-13 18:41:11 -0800143
tansell9e04a8d2016-07-28 09:31:59 -0700144 if fs.exists(name):
145 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000146 return None
tansell9e04a8d2016-07-28 09:31:59 -0700147
148
149# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
150# wrappers have been created.
151def fileobj_copy(
152 dstfileobj, srcfileobj, size=-1,
153 chunk_size=isolated_format.DISK_FILE_CHUNK):
154 """Copy data from srcfileobj to dstfileobj.
155
156 Providing size means exactly that amount of data will be copied (if there
157 isn't enough data, an IOError exception is thrown). Otherwise all data until
158 the EOF marker will be copied.
159 """
160 if size == -1 and hasattr(srcfileobj, 'tell'):
161 if srcfileobj.tell() != 0:
162 raise IOError('partial file but not using size')
163
164 written = 0
165 while written != size:
166 readsize = chunk_size
167 if size > 0:
168 readsize = min(readsize, size-written)
169 data = srcfileobj.read(readsize)
170 if not data:
171 if size == -1:
172 break
173 raise IOError('partial file, got %s, wanted %s' % (written, size))
174 dstfileobj.write(data)
175 written += len(data)
176
177
178def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
179 """Put srcfileobj at the given dstpath with given mode.
180
181 The function aims to do this as efficiently as possible while still allowing
182 any possible file like object be given.
183
184 Creating a tree of hardlinks has a few drawbacks:
185 - tmpfs cannot be used for the scratch space. The tree has to be on the same
186 partition as the cache.
187 - involves a write to the inode, which advances ctime, cause a metadata
188 writeback (causing disk seeking).
189 - cache ctime cannot be used to detect modifications / corruption.
190 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
191 partition. This is why the function automatically fallbacks to copying the
192 file content.
193 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
194 same owner is for all hardlinks.
195 - Anecdotal report that ext2 is known to be potentially faulty on high rate
196 of hardlink creation.
197
198 Creating a tree of symlinks has a few drawbacks:
199 - Tasks running the equivalent of os.path.realpath() will get the naked path
200 and may fail.
201 - Windows:
202 - Symlinks are reparse points:
203 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
204 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
205 - Symbolic links are Win32 paths, not NT paths.
206 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
207 - Symbolic links are supported on Windows 7 and later only.
208 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
209 default.
210 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
211 RID is present in the token;
212 https://msdn.microsoft.com/en-us/library/bb530410.aspx
213 """
214 srcpath = fileobj_path(srcfileobj)
215 if srcpath and size == -1:
216 readonly = file_mode is None or (
217 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
218
219 if readonly:
220 # If the file is read only we can link the file
221 if use_symlink:
222 link_mode = file_path.SYMLINK_WITH_FALLBACK
223 else:
224 link_mode = file_path.HARDLINK_WITH_FALLBACK
225 else:
226 # If not read only, we must copy the file
227 link_mode = file_path.COPY
228
229 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000230 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700231 else:
232 # Need to write out the file
233 with fs.open(dstpath, 'wb') as dstfileobj:
234 fileobj_copy(dstfileobj, srcfileobj, size)
235
Takuto Ikuta523c6472019-09-18 02:53:34 +0000236 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
237 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
238 # early to skip slow/unnecessary chmod call.
239 return
tansell9e04a8d2016-07-28 09:31:59 -0700240
241 # file_mode of 0 is actually valid, so need explicit check.
242 if file_mode is not None:
243 fs.chmod(dstpath, file_mode)
244
245
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000246def zip_compress(content_generator, level=7):
247 """Reads chunks from |content_generator| and yields zip compressed chunks."""
248 compressor = zlib.compressobj(level)
249 for chunk in content_generator:
250 compressed = compressor.compress(chunk)
251 if compressed:
252 yield compressed
253 tail = compressor.flush(zlib.Z_FINISH)
254 if tail:
255 yield tail
256
257
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400258def zip_decompress(
259 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000260 """Reads zipped data from |content_generator| and yields decompressed data.
261
262 Decompresses data in small chunks (no larger than |chunk_size|) so that
263 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
264
265 Raises IOError if data is corrupted or incomplete.
266 """
267 decompressor = zlib.decompressobj()
268 compressed_size = 0
269 try:
270 for chunk in content_generator:
271 compressed_size += len(chunk)
272 data = decompressor.decompress(chunk, chunk_size)
273 if data:
274 yield data
275 while decompressor.unconsumed_tail:
276 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
277 if data:
278 yield data
279 tail = decompressor.flush()
280 if tail:
281 yield tail
282 except zlib.error as e:
283 raise IOError(
284 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
285 # Ensure all data was read and decompressed.
286 if decompressor.unused_data or decompressor.unconsumed_tail:
287 raise IOError('Not all data was decompressed')
288
289
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000290def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000291 """Given a filename calculates the ideal zip compression level to use."""
292 file_ext = os.path.splitext(filename)[1].lower()
293 # TODO(csharp): Profile to find what compression level works best.
294 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
295
296
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000297def create_directories(base_directory, files):
298 """Creates the directory structure needed by the given list of files."""
299 logging.debug('create_directories(%s, %d)', base_directory, len(files))
300 # Creates the tree of directories to create.
301 directories = set(os.path.dirname(f) for f in files)
302 for item in list(directories):
303 while item:
304 directories.add(item)
305 item = os.path.dirname(item)
306 for d in sorted(directories):
307 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700308 abs_d = os.path.join(base_directory, d)
309 if not fs.isdir(abs_d):
310 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000311
312
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000313def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500314 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000315 for filepath, properties in files:
316 if 'l' not in properties:
317 continue
318 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500319 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000320 logging.warning('Ignoring symlink %s', filepath)
321 continue
322 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700323 try:
324 os.symlink(properties['l'], outfile) # pylint: disable=E1101
325 except OSError as e:
326 if e.errno == errno.EEXIST:
327 raise AlreadyExists('File %s already exists.' % outfile)
328 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000329
330
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000331class _ThreadFile(object):
332 """Multithreaded fake file. Used by TarBundle."""
333 def __init__(self):
334 self._data = threading_utils.TaskChannel()
335 self._offset = 0
336
337 def __iter__(self):
338 return self._data
339
340 def tell(self):
341 return self._offset
342
343 def write(self, b):
344 self._data.send_result(b)
345 self._offset += len(b)
346
347 def close(self):
348 self._data.send_done()
349
350
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400351class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800352 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000353
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800354 Its digest and size may be provided in advance, if known. Otherwise they will
355 be derived from the file content.
356 """
357
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000358 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800359 super(FileItem, self).__init__(
360 digest,
maruel12e30012015-10-09 11:55:35 -0700361 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000362 high_priority,
363 compression_level=_get_zip_compression_level(path))
364 self._path = path
365 self._algo = algo
366 self._meta = None
367
368 @property
369 def path(self):
370 return self._path
371
372 @property
373 def digest(self):
374 if not self._digest:
375 self._digest = isolated_format.hash_file(self._path, self._algo)
376 return self._digest
377
378 @property
379 def meta(self):
380 if not self._meta:
381 # TODO(maruel): Inline.
382 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
383 # We need to hash right away.
384 self._meta['h'] = self.digest
385 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000386
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800387 def content(self):
388 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000389
390
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000391class TarBundle(isolate_storage.Item):
392 """Tarfile to push to Storage.
393
394 Its digest is the digest of all the files it contains. It is generated on the
395 fly.
396 """
397
398 def __init__(self, root, algo):
399 # 2 trailing 512 bytes headers.
400 super(TarBundle, self).__init__(size=1024)
401 self._items = []
402 self._meta = None
403 self._algo = algo
404 self._root_len = len(root) + 1
405 # Same value as for Go.
406 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
407 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
408 self._archive_max_size = int(10e6)
409
410 @property
411 def digest(self):
412 if not self._digest:
413 self._prepare()
414 return self._digest
415
416 @property
417 def size(self):
418 if self._size is None:
419 self._prepare()
420 return self._size
421
422 def try_add(self, item):
423 """Try to add this file to the bundle.
424
425 It is extremely naive but this should be just enough for
426 https://crbug.com/825418.
427
428 Future improvements should be in the Go code, and the Swarming bot should be
429 migrated to use the Go code instead.
430 """
431 if not item.size:
432 return False
433 # pylint: disable=unreachable
434 rounded = (item.size + 512) & ~511
435 if rounded + self._size > self._archive_max_size:
436 return False
437 # https://crbug.com/825418
438 return False
439 self._size += rounded
440 self._items.append(item)
441 return True
442
443 def yield_item_path_meta(self):
444 """Returns a tuple(Item, filepath, meta_dict).
445
446 If the bundle contains less than 5 items, the items are yielded.
447 """
448 if len(self._items) < 5:
449 # The tarball is too small, yield individual items, if any.
450 for item in self._items:
451 yield item, item.path[self._root_len:], item.meta
452 else:
453 # This ensures self._meta is set.
454 p = self.digest + '.tar'
455 # Yield itself as a tarball.
456 yield self, p, self._meta
457
458 def content(self):
459 """Generates the tarfile content on the fly."""
460 obj = _ThreadFile()
461 def _tar_thread():
462 try:
463 t = tarfile.open(
464 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
465 for item in self._items:
466 logging.info(' tarring %s', item.path)
467 t.add(item.path)
468 t.close()
469 except Exception:
470 logging.exception('Internal failure')
471 finally:
472 obj.close()
473
474 t = threading.Thread(target=_tar_thread)
475 t.start()
476 try:
477 for data in obj:
478 yield data
479 finally:
480 t.join()
481
482 def _prepare(self):
483 h = self._algo()
484 total = 0
485 for chunk in self.content():
486 h.update(chunk)
487 total += len(chunk)
488 # pylint: disable=attribute-defined-outside-init
489 # This is not true, they are defined in Item.__init__().
490 self._digest = h.hexdigest()
491 self._size = total
492 self._meta = {
493 'h': self.digest,
494 's': self.size,
495 't': u'tar',
496 }
497
498
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400499class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000500 """A byte buffer to push to Storage."""
501
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000502 def __init__(self, buf, algo, high_priority=False):
503 super(BufferItem, self).__init__(
504 digest=algo(buf).hexdigest(),
505 size=len(buf),
506 high_priority=high_priority)
507 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000508
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800509 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000510 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000511
512
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000513class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000515
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 Implements compression support, parallel 'contains' checks, parallel uploads
517 and more.
518
519 Works only within single namespace (and thus hashing algorithm and compression
520 scheme are fixed).
521
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400522 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
523 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800524 """
525
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700526 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000527 self._storage_api = storage_api
528 self._cpu_thread_pool = None
529 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400530 self._aborted = False
531 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000532
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000533 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000534 def server_ref(self):
535 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700536
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000537 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700538 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000539 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700540
541 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000542 def cpu_thread_pool(self):
543 """ThreadPool for CPU-bound tasks like zipping."""
544 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500545 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000546 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
547 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500548 # On 32 bits userland, do not try to use more than 16 threads.
549 threads = min(threads, 16)
550 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000551 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000552
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000553 @property
554 def net_thread_pool(self):
555 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
556 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700557 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000558 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000559
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000560 def close(self):
561 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400562 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 if self._cpu_thread_pool:
564 self._cpu_thread_pool.join()
565 self._cpu_thread_pool.close()
566 self._cpu_thread_pool = None
567 if self._net_thread_pool:
568 self._net_thread_pool.join()
569 self._net_thread_pool.close()
570 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400571 logging.info('Done.')
572
573 def abort(self):
574 """Cancels any pending or future operations."""
575 # This is not strictly theadsafe, but in the worst case the logging message
576 # will be printed twice. Not a big deal. In other places it is assumed that
577 # unprotected reads and writes to _aborted are serializable (it is true
578 # for python) and thus no locking is used.
579 if not self._aborted:
580 logging.warning('Aborting... It can take a while.')
581 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000582
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000583 def __enter__(self):
584 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400585 assert not self._prev_sig_handlers, self._prev_sig_handlers
586 for s in (signal.SIGINT, signal.SIGTERM):
587 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000588 return self
589
590 def __exit__(self, _exc_type, _exc_value, _traceback):
591 """Context manager interface."""
592 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400593 while self._prev_sig_handlers:
594 s, h = self._prev_sig_handlers.popitem()
595 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000596 return False
597
Takuto Ikuta26980872020-04-09 06:56:37 +0000598 def upload_items(self, items, verify_push=False):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000599 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000600
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800601 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000602
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000603 It uses 3 threads internally:
604 - One to create batches based on a timeout
605 - One to dispatch the /contains RPC and field the missing entries
606 - One to field the /push RPC
607
608 The main threads enumerates 'items' and pushes to the first thread. Then it
609 join() all the threads, waiting for them to complete.
610
611 (enumerate items of Item, this can be slow as disk is traversed)
612 |
613 v
614 _create_items_batches_thread Thread #1
615 (generates list(Item), every 3s or 20~100 items)
616 |
617 v
618 _do_lookups_thread Thread #2
619 | |
620 v v
621 (missing) (was on server)
622 |
623 v
624 _handle_missing_thread Thread #3
625 |
626 v
627 (upload Item, append to uploaded)
628
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000629 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400630 items: list of isolate_storage.Item instances that represents data to
631 upload.
Takuto Ikuta26980872020-04-09 06:56:37 +0000632 verify_push: verify files are uploaded correctly by fetching from server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000633
634 Returns:
635 List of items that were uploaded. All other items are already there.
Ye Kuang4e994292020-04-10 07:07:35 +0000636
637 Raises:
638 The first exception being raised in the worker threads.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000639 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000640 incoming = Queue.Queue()
641 batches_to_lookup = Queue.Queue()
642 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000643 uploaded = []
Ye Kuang4e994292020-04-10 07:07:35 +0000644 exc_channel = threading_utils.TaskChannel()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000646 def _create_items_batches_thread():
647 """Creates batches for /contains RPC lookup from individual items.
648
649 Input: incoming
650 Output: batches_to_lookup
651 """
652 try:
653 batch_size_index = 0
654 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
655 batch = []
656 while not self._aborted:
657 try:
658 item = incoming.get(True, timeout=3)
659 if item:
660 batch.append(item)
661 except Queue.Empty:
662 item = False
663 if len(batch) == batch_size or (not item and batch):
664 if len(batch) == batch_size:
665 batch_size_index += 1
666 batch_size = ITEMS_PER_CONTAINS_QUERIES[
667 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
668 batches_to_lookup.put(batch)
669 batch = []
670 if item is None:
671 break
Ye Kuang4e994292020-04-10 07:07:35 +0000672 except Exception:
673 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000674 finally:
675 # Unblock the next pipeline.
676 batches_to_lookup.put(None)
677
678 def _do_lookups_thread():
679 """Enqueues all the /contains RPCs and emits the missing items.
680
681 Input: batches_to_lookup
682 Output: missing, to_upload
683 """
684 try:
685 channel = threading_utils.TaskChannel()
686 def _contains(b):
687 if self._aborted:
688 raise Aborted()
689 return self._storage_api.contains(b)
690
691 pending_contains = 0
692 while not self._aborted:
693 batch = batches_to_lookup.get()
694 if batch is None:
695 break
696 self.net_thread_pool.add_task_with_channel(
697 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
698 pending_contains += 1
699 while pending_contains and not self._aborted:
700 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000701 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000702 except threading_utils.TaskChannel.Timeout:
703 break
704 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000705 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000706 missing.put((missing_item, push_state))
707 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000708 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000709 missing.put((missing_item, push_state))
710 pending_contains -= 1
Ye Kuang4e994292020-04-10 07:07:35 +0000711 except Exception:
712 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000713 finally:
714 # Unblock the next pipeline.
715 missing.put((None, None))
716
717 def _handle_missing_thread():
718 """Sends the missing items to the uploader.
719
720 Input: missing
721 Output: uploaded
722 """
Ye Kuang4e994292020-04-10 07:07:35 +0000723 try:
724 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
725 channel = threading_utils.TaskChannel()
726 pending_upload = 0
727 while not self._aborted:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000728 try:
Ye Kuang4e994292020-04-10 07:07:35 +0000729 missing_item, push_state = missing.get(True, timeout=5)
730 if missing_item is None:
731 break
732 self._async_push(channel, missing_item, push_state, verify_push)
733 pending_upload += 1
734 except Queue.Empty:
735 pass
736 detector.ping()
737 while not self._aborted and pending_upload:
738 try:
739 item = channel.next(timeout=0)
740 except threading_utils.TaskChannel.Timeout:
741 break
742 uploaded.append(item)
743 pending_upload -= 1
744 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
745 pending_upload, item.digest, item.size)
746 while not self._aborted and pending_upload:
747 item = channel.next()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000748 uploaded.append(item)
749 pending_upload -= 1
750 logging.debug(
751 'Uploaded %d; %d pending: %s (%d)',
752 len(uploaded), pending_upload, item.digest, item.size)
Ye Kuang4e994292020-04-10 07:07:35 +0000753 except Exception:
754 exc_channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000755
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000756 threads = [
757 threading.Thread(target=_create_items_batches_thread),
758 threading.Thread(target=_do_lookups_thread),
759 threading.Thread(target=_handle_missing_thread),
760 ]
761 for t in threads:
762 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000763
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000764 try:
765 # For each digest keep only first isolate_storage.Item that matches it.
766 # All other items are just indistinguishable copies from the point of view
767 # of isolate server (it doesn't care about paths at all, only content and
768 # digests).
769 seen = {}
770 try:
771 # TODO(maruel): Reorder the items as a priority queue, with larger items
772 # being processed first. This is, before hashing the data.
773 # This must be done in the primary thread since items can be a
774 # generator.
775 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000776 if seen.setdefault(item.digest, item) is item:
777 incoming.put(item)
778 finally:
779 incoming.put(None)
780 finally:
781 for t in threads:
782 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +0000783 exc_channel.send_done()
784 for _ in exc_channel:
785 # If there is no exception, this loop does nothing. Otherwise, it raises
786 # the first exception put onto |exc_channel|.
787 pass
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000788
789 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000790 if seen:
791 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000792 return uploaded
793
Takuto Ikuta26980872020-04-09 06:56:37 +0000794 def _async_push(self, channel, item, push_state, verify_push=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000795 """Starts asynchronous push to the server in a parallel thread.
796
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000797 Can be used only after |item| was checked for presence on a server with a
798 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800799
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000800 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000801 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400802 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000803 push_state: push state returned by storage_api.contains(). It contains
804 storage specific information describing how to upload the item (for
805 example in case of cloud storage, it is signed upload URLs).
Takuto Ikuta26980872020-04-09 06:56:37 +0000806 verify_push: verify files are uploaded correctly by fetching from server.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800807
808 Returns:
809 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000810 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800811 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400812 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700813 threading_utils.PRIORITY_HIGH if item.high_priority
814 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800815
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000816 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400817 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400818 if self._aborted:
819 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800820 self._storage_api.push(item, push_state, content)
Takuto Ikuta26980872020-04-09 06:56:37 +0000821 if verify_push:
822 self._fetch(
823 item.digest,
824 item.size,
825 # this consumes all elements from given generator.
826 lambda gen: collections.deque(gen, maxlen=0))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000827 return item
828
Wei Huang1a38fbe2017-11-28 22:55:22 -0500829 # If zipping is not required, just start a push task. Don't pass 'content'
830 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000831 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000832 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000833 return
834
835 # If zipping is enabled, zip in a separate thread.
836 def zip_and_push():
837 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
838 # content right here. It will block until all file is zipped.
839 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400840 if self._aborted:
841 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800842 stream = zip_compress(item.content(), item.compression_level)
Lei Lei73a5f732020-03-23 20:36:14 +0000843 # In Python3, zlib.compress returns a byte object instead of str.
844 data = six.b('').join(stream)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000845 except Exception as exc:
846 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800847 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000848 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500849 # Pass '[data]' explicitly because the compressed data is not same as the
850 # one provided by 'item'. Since '[data]' is a list, it can safely be
851 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000852 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000853 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000854 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000855
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800856 def push(self, item, push_state):
857 """Synchronously pushes a single item to the server.
858
859 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000860 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800861
862 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400863 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000864 push_state: push state returned by storage_api.contains(). It contains
865 storage specific information describing how to upload the item (for
866 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800867
868 Returns:
869 Pushed item (same object as |item|).
870 """
871 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700872 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000873 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000874 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875 assert pushed is item
876 return item
877
Takuto Ikuta26980872020-04-09 06:56:37 +0000878 def _fetch(self, digest, size, sink):
879 try:
880 # Prepare reading pipeline.
881 stream = self._storage_api.fetch(digest, size, 0)
882 if self.server_ref.is_with_compression:
883 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
884 # Run |stream| through verifier that will assert its size.
885 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
886 size)
887 # Verified stream goes to |sink|.
888 sink(verifier.run())
889 except Exception as err:
890 logging.error('Failed to fetch %s: %s', digest, err)
891 raise
892
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000893 def async_fetch(self, channel, priority, digest, size, sink):
894 """Starts asynchronous fetch from the server in a parallel thread.
895
896 Arguments:
897 channel: TaskChannel that receives back |digest| when download ends.
898 priority: thread pool task priority for the fetch.
899 digest: hex digest of an item to download.
900 size: expected size of the item (after decompression).
901 sink: function that will be called as sink(generator).
902 """
903 def fetch():
Takuto Ikuta26980872020-04-09 06:56:37 +0000904 self._fetch(digest, size, sink)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000905 return digest
906
907 # Don't bother with zip_thread_pool for decompression. Decompression is
908 # really fast and most probably IO bound anyway.
909 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
910
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000911
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000912class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400913 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000914
915 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400916 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
917 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000918 """
919
920 def __init__(self, storage, cache):
921 self.storage = storage
922 self.cache = cache
923 self._channel = threading_utils.TaskChannel()
924 self._pending = set()
925 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000926 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400927 # Pending digests that the caller waits for, see wait_on()/wait().
928 self._waiting_on = set()
929 # Already fetched digests the caller waits for which are not yet returned by
930 # wait().
931 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000932
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400933 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700934 self,
935 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400936 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700937 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000938 """Starts asynchronous fetch of item |digest|."""
939 # Fetching it now?
940 if digest in self._pending:
941 return
942
943 # Mark this file as in use, verify_all_cached will later ensure it is still
944 # in cache.
945 self._accessed.add(digest)
946
947 # Already fetched? Notify cache to update item's LRU position.
948 if digest in self._fetched:
949 # 'touch' returns True if item is in cache and not corrupted.
950 if self.cache.touch(digest, size):
951 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000952 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000953 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000954
955 # TODO(maruel): It should look at the free disk space, the current cache
956 # size and the size of the new item on every new item:
957 # - Trim the cache as more entries are listed when free disk space is low,
958 # otherwise if the amount of data downloaded during the run > free disk
959 # space, it'll crash.
960 # - Make sure there's enough free disk space to fit all dependencies of
961 # this run! If not, abort early.
962
963 # Start fetching.
964 self._pending.add(digest)
965 self.storage.async_fetch(
966 self._channel, priority, digest, size,
967 functools.partial(self.cache.write, digest))
968
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400969 def wait_on(self, digest):
970 """Updates digests to be waited on by 'wait'."""
971 # Calculate once the already fetched items. These will be retrieved first.
972 if digest in self._fetched:
973 self._waiting_on_ready.add(digest)
974 else:
975 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000976
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400977 def wait(self):
978 """Waits until any of waited-on items is retrieved.
979
980 Once this happens, it is remove from the waited-on set and returned.
981
982 This function is called in two waves. The first wave it is done for HIGH
983 priority items, the isolated files themselves. The second wave it is called
984 for all the files.
985
986 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000987 """
988 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400989 if self._waiting_on_ready:
990 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000991
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400992 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000993
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400994 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000995 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000996 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000997 self._pending.remove(digest)
998 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400999 if digest in self._waiting_on:
1000 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001001 return digest
1002
1003 # Should never reach this point due to assert above.
1004 raise RuntimeError('Impossible state')
1005
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001006 @property
1007 def wait_queue_empty(self):
1008 """Returns True if there is no digest left for wait() to return."""
1009 return not self._waiting_on and not self._waiting_on_ready
1010
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001011 def inject_local_file(self, path, algo):
1012 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -07001013 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001014 data = f.read()
1015 digest = algo(data).hexdigest()
1016 self.cache.write(digest, [data])
1017 self._fetched.add(digest)
1018 return digest
1019
1020 @property
1021 def pending_count(self):
1022 """Returns number of items to be fetched."""
1023 return len(self._pending)
1024
1025 def verify_all_cached(self):
1026 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001027 # Not thread safe, but called after all work is done.
1028 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001029
1030
1031class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001032 """Verifies that fetched file is valid before passing it to the
1033 ContentAddressedCache.
1034 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001035
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001036 def __init__(self, stream, hasher, expected_digest, expected_size):
1037 """Initializes the verifier.
1038
1039 Arguments:
1040 * stream: an iterable yielding chunks of content
1041 * hasher: an object from hashlib that supports update() and hexdigest()
1042 (eg, hashlib.sha1).
1043 * expected_digest: if the entire stream is piped through hasher and then
1044 summarized via hexdigest(), this should be the result. That is, it
1045 should be a hex string like 'abc123'.
1046 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001047 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001048 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001049 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001050 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001051 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001052 self.expected_size = expected_size
1053 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001054 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001055
1056 def run(self):
1057 """Generator that yields same items as |stream|.
1058
1059 Verifies |stream| is complete before yielding a last chunk to consumer.
1060
1061 Also wraps IOError produced by consumer into MappingError exceptions since
1062 otherwise Storage will retry fetch on unrelated local cache errors.
1063 """
1064 # Read one chunk ahead, keep it in |stored|.
1065 # That way a complete stream can be verified before pushing last chunk
1066 # to consumer.
1067 stored = None
1068 for chunk in self.stream:
1069 assert chunk is not None
1070 if stored is not None:
1071 self._inspect_chunk(stored, is_last=False)
1072 try:
1073 yield stored
1074 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001075 raise isolated_format.MappingError(
1076 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001077 stored = chunk
1078 if stored is not None:
1079 self._inspect_chunk(stored, is_last=True)
1080 try:
1081 yield stored
1082 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001083 raise isolated_format.MappingError(
1084 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001085
1086 def _inspect_chunk(self, chunk, is_last):
1087 """Called for each fetched chunk before passing it to consumer."""
1088 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001089 self.rolling_hash.update(chunk)
1090 if not is_last:
1091 return
1092
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001093 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001094 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001095 msg = 'Incorrect file size: want %d, got %d' % (
1096 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001097 raise IOError(msg)
1098
1099 actual_digest = self.rolling_hash.hexdigest()
1100 if self.expected_digest != actual_digest:
1101 msg = 'Incorrect digest: want %s, got %s' % (
1102 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001103 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001104
1105
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001106class IsolatedBundle(object):
1107 """Fetched and parsed .isolated file with all dependencies."""
1108
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001109 def __init__(self, filter_cb):
1110 """
1111 filter_cb: callback function to filter downloaded content.
1112 When filter_cb is not None, Isolated file is downloaded iff
1113 filter_cb(filepath) returns True.
1114 """
1115
Vadim Shtayura3148e072014-09-02 18:51:52 -07001116 self.command = []
1117 self.files = {}
1118 self.read_only = None
1119 self.relative_cwd = None
1120 # The main .isolated file, a IsolatedFile instance.
1121 self.root = None
1122
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001123 self._filter_cb = filter_cb
1124
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001125 def fetch(self, fetch_queue, root_isolated_hash, algo):
1126 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001127
1128 It enables support for "included" .isolated files. They are processed in
1129 strict order but fetched asynchronously from the cache. This is important so
1130 that a file in an included .isolated file that is overridden by an embedding
1131 .isolated file is not fetched needlessly. The includes are fetched in one
1132 pass and the files are fetched as soon as all the ones on the left-side
1133 of the tree were fetched.
1134
1135 The prioritization is very important here for nested .isolated files.
1136 'includes' have the highest priority and the algorithm is optimized for both
1137 deep and wide trees. A deep one is a long link of .isolated files referenced
1138 one at a time by one item in 'includes'. A wide one has a large number of
1139 'includes' in a single .isolated file. 'left' is defined as an included
1140 .isolated file earlier in the 'includes' list. So the order of the elements
1141 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001142
1143 As a side effect this method starts asynchronous fetch of all data files
1144 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1145 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001146 """
1147 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1148
1149 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1150 pending = {}
1151 # Set of hashes of already retrieved items to refuse recursive includes.
1152 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001153 # Set of IsolatedFile's whose data files have already being fetched.
1154 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001155
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001156 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001157 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001158 h = isolated_file.obj_hash
1159 if h in seen:
1160 raise isolated_format.IsolatedError(
1161 'IsolatedFile %s is retrieved recursively' % h)
1162 assert h not in pending
1163 seen.add(h)
1164 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001165 # This isolated item is being added dynamically, notify FetchQueue.
1166 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001167 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1168
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001169 # Start fetching root *.isolated file (single file, not the whole bundle).
1170 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001171
1172 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001173 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001174 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001175 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001176 with fetch_queue.cache.getfileobj(item_hash) as f:
1177 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001178
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001179 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001180 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001181 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001182
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001183 # Always fetch *.isolated files in traversal order, waiting if necessary
1184 # until next to-be-processed node loads. "Waiting" is done by yielding
1185 # back to the outer loop, that waits until some *.isolated is loaded.
1186 for node in isolated_format.walk_includes(self.root):
1187 if node not in processed:
1188 # Not visited, and not yet loaded -> wait for it to load.
1189 if not node.is_loaded:
1190 break
1191 # Not visited and loaded -> process it and continue the traversal.
1192 self._start_fetching_files(node, fetch_queue)
1193 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001194
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001195 # All *.isolated files should be processed by now and only them.
1196 all_isolateds = set(isolated_format.walk_includes(self.root))
1197 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001198 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001199
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001200 # Extract 'command' and other bundle properties.
1201 for node in isolated_format.walk_includes(self.root):
1202 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001203 self.relative_cwd = self.relative_cwd or ''
1204
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001205 def _start_fetching_files(self, isolated, fetch_queue):
1206 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001207
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001208 Modifies self.files.
1209 """
maruel10bea7b2016-12-07 05:03:49 -08001210 files = isolated.data.get('files', {})
1211 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001212 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001213 if self._filter_cb and not self._filter_cb(filepath):
1214 continue
1215
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001216 # Root isolated has priority on the files being mapped. In particular,
1217 # overridden files must not be fetched.
1218 if filepath not in self.files:
1219 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001220
1221 # Make sure if the isolated is read only, the mode doesn't have write
1222 # bits.
1223 if 'm' in properties and self.read_only:
1224 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1225
1226 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001227 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001228 fetch_queue.add(
1229 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1230
1231 def _update_self(self, node):
1232 """Extracts bundle global parameters from loaded *.isolated file.
1233
1234 Will be called with each loaded *.isolated file in order of traversal of
1235 isolated include graph (see isolated_format.walk_includes).
1236 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001237 # Grabs properties.
1238 if not self.command and node.data.get('command'):
1239 # Ensure paths are correctly separated on windows.
1240 self.command = node.data['command']
1241 if self.command:
1242 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001243 if self.read_only is None and node.data.get('read_only') is not None:
1244 self.read_only = node.data['read_only']
1245 if (self.relative_cwd is None and
1246 node.data.get('relative_cwd') is not None):
1247 self.relative_cwd = node.data['relative_cwd']
1248
1249
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001250def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001251 """Returns Storage class that can upload and download from |namespace|.
1252
1253 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001254 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001255
1256 Returns:
1257 Instance of Storage.
1258 """
Lei Lei73a5f732020-03-23 20:36:14 +00001259 # Handle the specific internal use case.
1260 assert (isinstance(server_ref, isolate_storage.ServerRef) or
1261 type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001262 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001263
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001264
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001265def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1266 """Put downloaded file to destination path. This function is used for multi
1267 threaded file putting.
1268 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001269 with tools.Profiler("_map_file for %s" % dst):
1270 with cache.getfileobj(digest) as srcfileobj:
1271 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001272
Takuto Ikuta523c6472019-09-18 02:53:34 +00001273 if filetype == 'basic':
1274 # Ignore all bits apart from the user.
1275 file_mode = (props.get('m') or 0o500) & 0o700
1276 if read_only:
1277 # Enforce read-only if the root bundle does.
1278 file_mode &= 0o500
1279 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001280
Takuto Ikuta523c6472019-09-18 02:53:34 +00001281 elif filetype == 'tar':
1282 basedir = os.path.dirname(dst)
1283 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1284 ensured_dirs = set()
1285 for ti in t:
1286 if not ti.isfile():
1287 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1288 ti.type)
1289 continue
1290 # Handle files created on Windows fetched on POSIX and the
1291 # reverse.
1292 other_sep = '/' if os.path.sep == '\\' else '\\'
1293 name = ti.name.replace(other_sep, os.path.sep)
1294 fp = os.path.normpath(os.path.join(basedir, name))
1295 if not fp.startswith(basedir):
1296 logging.error('Path(%r) is outside root directory', fp)
1297 ifd = t.extractfile(ti)
1298 fp_dir = os.path.dirname(fp)
1299 if fp_dir not in ensured_dirs:
1300 file_path.ensure_tree(fp_dir)
1301 ensured_dirs.add(fp_dir)
1302 file_mode = ti.mode & 0o700
1303 if read_only:
1304 # Enforce read-only if the root bundle does.
1305 file_mode &= 0o500
1306 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001307
Takuto Ikuta523c6472019-09-18 02:53:34 +00001308 else:
1309 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001310
1311
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001312def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1313 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001314 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001315
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001316 Arguments:
1317 isolated_hash: hash of the root *.isolated file.
1318 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001319 cache: ContentAddressedCache class that knows how to store and map files
1320 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001321 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001322 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001323 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001324
1325 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001326 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001327 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001328 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001329 'fetch_isolated(%s, %s, %s, %s, %s)',
1330 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001331 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001332 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001333 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001334 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001335
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001336 with tools.Profiler('GetIsolateds'):
1337 # Optionally support local files by manually adding them to cache.
1338 if not isolated_format.is_valid_hash(isolated_hash, algo):
1339 logging.debug('%s is not a valid hash, assuming a file '
1340 '(algo was %s, hash size was %d)',
1341 isolated_hash, algo(), algo().digest_size)
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001342 path = six.text_type(os.path.abspath(isolated_hash))
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001343 try:
1344 isolated_hash = fetch_queue.inject_local_file(path, algo)
1345 except IOError as e:
1346 raise isolated_format.MappingError(
1347 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1348 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001349
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001350 # Load all *.isolated and start loading rest of the files.
1351 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001352
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001353 with tools.Profiler('GetRest'):
1354 # Create file system hierarchy.
1355 file_path.ensure_tree(outdir)
1356 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001357 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001358
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001359 # Ensure working directory exists.
1360 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1361 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001362
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001363 # Multimap: digest -> list of pairs (path, props).
1364 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001365 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001366 if 'h' in props:
1367 remaining.setdefault(props['h'], []).append((filepath, props))
1368 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001369
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001370 # Now block on the remaining files to be downloaded and mapped.
1371 logging.info('Retrieving remaining files (%d of them)...',
1372 fetch_queue.pending_count)
1373 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001374
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001375 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1376 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1377 while remaining:
1378 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001379
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001380 # Wait for any item to finish fetching to cache.
1381 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001382
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001383 # Create the files in the destination using item in cache as the
1384 # source.
1385 for filepath, props in remaining.pop(digest):
1386 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001387
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001388 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1389 _map_file, fullpath, digest,
1390 props, cache, bundle.read_only,
1391 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001392
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001393 # Report progress.
1394 duration = time.time() - last_update
1395 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1396 msg = '%d files remaining...' % len(remaining)
1397 sys.stdout.write(msg + '\n')
1398 sys.stdout.flush()
1399 logging.info(msg)
1400 last_update = time.time()
1401 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1402 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001403
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001404 # Save the cache right away to not loose the state of the new objects.
1405 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001406 # Cache could evict some items we just tried to fetch, it's a fatal error.
1407 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001408 free_disk = file_path.get_free_space(cache.cache_dir)
1409 msg = (
1410 'Cache is too small to hold all requested files.\n'
1411 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001412 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001413 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001414 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001415
1416
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001417def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001418 """Yields every file and/or symlink found.
1419
1420 Yields:
1421 tuple(FileItem, relpath, metadata)
1422 For a symlink, FileItem is None.
1423 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001424 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001425 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001426 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001427 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001428 root,
1429 u'.' + os.path.sep,
1430 blacklist,
1431 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001432
1433 filepath = os.path.join(root, relpath)
1434 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001435 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001436 meta = isolated_format.file_to_metadata(filepath, 0, False)
1437 yield None, relpath, meta
1438 continue
1439
1440 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001441 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1442 # The file was added to the current pending tarball and won't be archived
1443 # individually.
1444 continue
1445
1446 # Flush and reset the bundle.
1447 for i, p, m in bundle.yield_item_path_meta():
1448 yield i, p, m
1449 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001450
1451 # Yield the file individually.
1452 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1453 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001454
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001455 for i, p, m in bundle.yield_item_path_meta():
1456 yield i, p, m
1457
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001458
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001459def _print_upload_stats(items, missing):
1460 """Prints upload stats."""
1461 total = len(items)
1462 total_size = sum(f.size for f in items)
1463 logging.info(
1464 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1465 cache_hit = set(items).difference(missing)
1466 cache_hit_size = sum(f.size for f in cache_hit)
1467 logging.info(
1468 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1469 len(cache_hit),
1470 cache_hit_size / 1024.,
1471 len(cache_hit) * 100. / total,
1472 cache_hit_size * 100. / total_size if total_size else 0)
1473 cache_miss = missing
1474 cache_miss_size = sum(f.size for f in cache_miss)
1475 logging.info(
1476 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1477 len(cache_miss),
1478 cache_miss_size / 1024.,
1479 len(cache_miss) * 100. / total,
1480 cache_miss_size * 100. / total_size if total_size else 0)
1481
1482
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001483def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001484 """Called by archive_files_to_storage for a directory.
1485
1486 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001487
1488 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001489 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001490 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001491 files = {}
1492 for item, relpath, meta in _directory_to_metadata(
1493 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001494 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001495 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001496 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001497 yield item
1498
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001499 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001500 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001501 'algo': hash_algo_name,
1502 'files': files,
1503 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001504 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001505 # Keep the file in memory. This is fine because .isolated files are relatively
1506 # small.
1507 yield BufferItem(
1508 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001509
1510
Takuto Ikuta26980872020-04-09 06:56:37 +00001511def archive_files_to_storage(storage, files, blacklist, verify_push=False):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001512 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001513
1514 Arguments:
1515 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001516 files: iterable of files to upload. If a directory is specified (with a
1517 trailing slash), a .isolated file is created and its hash is returned.
1518 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001519 blacklist: function that returns True if a file should be omitted.
Takuto Ikuta26980872020-04-09 06:56:37 +00001520 verify_push: verify files are uploaded correctly by fetching from server.
maruel064c0a32016-04-05 11:47:15 -07001521
1522 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001523 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001524 The first file in the first item is always the .isolated file.
Ye Kuang4e994292020-04-10 07:07:35 +00001525
1526 Raises:
1527 Re-raises the exception in upload_items(), if there is any.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001528 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001529 # Dict of path to hash.
1530 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001531 hash_algo = storage.server_ref.hash_algo
1532 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001533 # Generator of FileItem to pass to upload_items() concurrent operation.
1534 channel = threading_utils.TaskChannel()
Ye Kuang4e994292020-04-10 07:07:35 +00001535 exc_channel = threading_utils.TaskChannel()
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001536 uploaded_digests = set()
Ye Kuang4e994292020-04-10 07:07:35 +00001537
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001538 def _upload_items():
Ye Kuang4e994292020-04-10 07:07:35 +00001539 try:
1540 results = storage.upload_items(channel, verify_push)
1541 uploaded_digests.update(f.digest for f in results)
1542 except Exception:
1543 exc_channel.send_exception()
1544
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001545 t = threading.Thread(target=_upload_items)
1546 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001547
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001548 # Keep track locally of the items to determine cold and hot items.
1549 items_found = []
1550 try:
1551 for f in files:
Takuto Ikuta95459dd2019-10-29 12:39:47 +00001552 assert isinstance(f, six.text_type), repr(f)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001553 if f in results:
1554 # Duplicate
1555 continue
1556 try:
1557 filepath = os.path.abspath(f)
1558 if fs.isdir(filepath):
1559 # Uploading a whole directory.
1560 item = None
1561 for item in _enqueue_dir(
1562 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001563 channel.send_result(item)
1564 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001565 # The very last item will be the .isolated file.
1566 if not item:
1567 # There was no file in the directory.
1568 continue
1569 elif fs.isfile(filepath):
1570 item = FileItem(
1571 path=filepath,
1572 algo=hash_algo,
1573 size=None,
1574 high_priority=f.endswith('.isolated'))
1575 channel.send_result(item)
1576 items_found.append(item)
1577 else:
1578 raise Error('%s is neither a file or directory.' % f)
1579 results[f] = item.digest
1580 except OSError:
1581 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001582 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001583 # Stops the generator, so _upload_items() can exit.
1584 channel.send_done()
1585 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +00001586 exc_channel.send_done()
1587 for _ in exc_channel:
1588 pass
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001589
1590 cold = []
1591 hot = []
1592 for i in items_found:
1593 # Note that multiple FileItem may have the same .digest.
1594 if i.digest in uploaded_digests:
1595 cold.append(i)
1596 else:
1597 hot.append(i)
1598 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001599
1600
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001601@subcommand.usage('<file1..fileN> or - to read from stdin')
1602def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001603 """Archives data to the server.
1604
1605 If a directory is specified, a .isolated file is created the whole directory
1606 is uploaded. Then this .isolated file can be included in another one to run
1607 commands.
1608
1609 The commands output each file that was processed with its content hash. For
1610 directories, the .isolated generated for the directory is listed as the
1611 directory entry itself.
1612 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001613 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001614 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001615 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001616 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001617 server_ref = isolate_storage.ServerRef(
1618 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001619 if files == ['-']:
1620 files = (l.rstrip('\n\r') for l in sys.stdin)
1621 if not files:
1622 parser.error('Nothing to upload')
1623 files = (f.decode('utf-8') for f in files)
1624 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001625 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001626 with get_storage(server_ref) as storage:
1627 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001628 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001629 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001630 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001631 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001632
1633
1634def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001635 """Download data from the server.
1636
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001637 It can either download individual files or a complete tree from a .isolated
1638 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001639 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001640 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001641 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001642 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001643 help='hash of an isolated file, .isolated file content is discarded, use '
1644 '--file if you need it')
1645 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001646 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1647 help='hash and destination of a file, can be used multiple times')
1648 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001649 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001650 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001651 parser.add_option(
1652 '--use-symlinks', action='store_true',
1653 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001654 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001655 options, args = parser.parse_args(args)
1656 if args:
1657 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001658 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001659 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001660
nodir55be77b2016-05-03 09:39:57 -07001661 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001662 if bool(options.isolated) == bool(options.file):
1663 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001664 if not options.cache and options.use_symlinks:
1665 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001666
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001667 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001668 cache.cleanup()
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001669 options.target = six.text_type(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001670 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001671 if (fs.isfile(options.target) or
1672 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001673 parser.error(
1674 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001675 server_ref = isolate_storage.ServerRef(
1676 options.isolate_server, options.namespace)
1677 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001678 # Fetching individual files.
1679 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001680 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001681 channel = threading_utils.TaskChannel()
1682 pending = {}
1683 for digest, dest in options.file:
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001684 dest = six.text_type(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001685 pending[digest] = dest
1686 storage.async_fetch(
1687 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001688 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001689 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001690 local_caching.UNKNOWN_FILE_SIZE,
1691 functools.partial(
1692 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001693 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001694 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001695 dest = pending.pop(fetched)
1696 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001697
Vadim Shtayura3172be52013-12-03 12:49:05 -08001698 # Fetching whole isolated tree.
1699 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001700 bundle = fetch_isolated(
1701 isolated_hash=options.isolated,
1702 storage=storage,
1703 cache=cache,
1704 outdir=options.target,
1705 use_symlinks=options.use_symlinks)
1706 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001707 if bundle.command:
1708 rel = os.path.join(options.target, bundle.relative_cwd)
1709 print('To run this test please run from the directory %s:' %
1710 os.path.join(options.target, rel))
1711 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001712
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001713 return 0
1714
1715
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001716def add_archive_options(parser):
1717 parser.add_option(
1718 '--blacklist',
1719 action='append', default=list(DEFAULT_BLACKLIST),
1720 help='List of regexp to use as blacklist filter when uploading '
1721 'directories')
1722
1723
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001724def add_isolate_server_options(parser):
1725 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001726 parser.add_option(
1727 '-I', '--isolate-server',
1728 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001729 help='URL of the Isolate Server to use. Defaults to the environment '
1730 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1731 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001732 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001733 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001734 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001735 '--namespace', default='default-gzip',
1736 help='The namespace to use on the Isolate Server, default: %default')
1737
1738
nodir55be77b2016-05-03 09:39:57 -07001739def process_isolate_server_options(
1740 parser, options, set_exception_handler, required):
1741 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001742
1743 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001744 """
1745 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001746 if required:
1747 parser.error('--isolate-server is required.')
1748 return
1749
aludwind7b7b7e2017-06-29 16:38:50 -07001750 if options.grpc_proxy:
1751 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001752 else:
1753 try:
1754 options.isolate_server = net.fix_url(options.isolate_server)
1755 except ValueError as e:
1756 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001757 if set_exception_handler:
1758 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001759 try:
1760 return auth.ensure_logged_in(options.isolate_server)
1761 except ValueError as e:
1762 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001763 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001764
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001765
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001766def add_cache_options(parser):
1767 cache_group = optparse.OptionGroup(parser, 'Cache management')
1768 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001769 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001770 help='Directory to keep a local cache of the files. Accelerates download '
1771 'by reusing already downloaded files. Default=%default')
1772 cache_group.add_option(
1773 '--max-cache-size',
1774 type='int',
1775 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001776 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001777 help='Trim if the cache gets larger than this value, default=%default')
1778 cache_group.add_option(
1779 '--min-free-space',
1780 type='int',
1781 metavar='NNN',
1782 default=2*1024*1024*1024,
1783 help='Trim if disk free space becomes lower than this value, '
1784 'default=%default')
1785 cache_group.add_option(
1786 '--max-items',
1787 type='int',
1788 metavar='NNN',
1789 default=100000,
1790 help='Trim if more than this number of items are in the cache '
1791 'default=%default')
1792 parser.add_option_group(cache_group)
1793
1794
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001795def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001796 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001797 policies = local_caching.CachePolicies(
1798 options.max_cache_size,
1799 options.min_free_space,
1800 options.max_items,
1801 # 3 weeks.
1802 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001803
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001804 # |options.cache| path may not exist until DiskContentAddressedCache()
1805 # instance is created.
1806 return local_caching.DiskContentAddressedCache(
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001807 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001808 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001809
1810
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001811class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001812 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001813 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001814 self,
1815 version=__version__,
1816 prog=os.path.basename(sys.modules[__name__].__file__),
1817 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001818 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001819
1820 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001821 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001822 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001823 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001824 return options, args
1825
1826
1827def main(args):
1828 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001829 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001830
1831
1832if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001833 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001834 fix_encoding.fix_encoding()
1835 tools.disable_buffering()
1836 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001837 sys.exit(main(sys.argv[1:]))