blob: 499494871686f57a970c2c38d748b244d4552cbd [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Takuto Ikuta160b4452020-04-15 06:33:55 +000025from utils import net
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
Takuto Ikuta6e2ff962019-10-29 12:35:27 +000033import six
Lei Leife202df2019-06-11 17:33:34 +000034from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000035
36# pylint: disable=ungrouped-imports
37import auth
38import isolated_format
39import isolate_storage
40import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050041from utils import file_path
maruel12e30012015-10-09 11:55:35 -070042from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040043from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000044from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040045from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070046from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000047from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080048
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000049
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000050# Version of isolate protocol passed to the server in /handshake request.
51ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000052
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053
Vadim Shtayura3148e072014-09-02 18:51:52 -070054# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Junji Watanabe38b28b02020-04-23 10:23:30 +000077 '7z',
78 'avi',
79 'cur',
80 'gif',
81 'h264',
82 'jar',
83 'jpeg',
84 'jpg',
85 'mp4',
86 'pdf',
87 'png',
88 'wav',
89 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000090]
91
maruel@chromium.org41601642013-09-18 19:40:46 +000092# The delay (in seconds) to wait between logging statements when retrieving
93# the required files. This is intended to let the user (or buildbot) know that
94# the program is still running.
95DELAY_BETWEEN_UPDATES_IN_SECS = 30
96
97
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050098DEFAULT_BLACKLIST = (
Junji Watanabe38b28b02020-04-23 10:23:30 +000099 # Temporary vim or python files.
100 r'^.+\.(?:pyc|swp)$',
101 # .git or .svn directory.
102 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500103)
104
105
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500106class Error(Exception):
107 """Generic runtime error."""
108 pass
109
110
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400111class Aborted(Error):
112 """Operation aborted."""
113 pass
114
115
nodir90bc8dc2016-06-15 13:35:21 -0700116class AlreadyExists(Error):
117 """File already exists."""
118
119
maruel12e30012015-10-09 11:55:35 -0700120def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800121 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700122 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800123 if offset:
124 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000126 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000127 if not data:
128 break
129 yield data
130
131
tansell9e04a8d2016-07-28 09:31:59 -0700132def fileobj_path(fileobj):
133 """Return file system path for file like object or None.
134
135 The returned path is guaranteed to exist and can be passed to file system
136 operations like copy.
137 """
138 name = getattr(fileobj, 'name', None)
139 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000140 return None
tansell9e04a8d2016-07-28 09:31:59 -0700141
142 # If the file like object was created using something like open("test.txt")
143 # name will end up being a str (such as a function outside our control, like
144 # the standard library). We want all our paths to be unicode objects, so we
145 # decode it.
Takuto Ikuta95459dd2019-10-29 12:39:47 +0000146 if not isinstance(name, six.text_type):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500147 # We incorrectly assume that UTF-8 is used everywhere.
148 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700149
tansell26de79e2016-11-13 18:41:11 -0800150 # fs.exists requires an absolute path, otherwise it will fail with an
151 # assertion error.
152 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000153 return None
tansell26de79e2016-11-13 18:41:11 -0800154
tansell9e04a8d2016-07-28 09:31:59 -0700155 if fs.exists(name):
156 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000157 return None
tansell9e04a8d2016-07-28 09:31:59 -0700158
159
160# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
161# wrappers have been created.
162def fileobj_copy(
163 dstfileobj, srcfileobj, size=-1,
164 chunk_size=isolated_format.DISK_FILE_CHUNK):
165 """Copy data from srcfileobj to dstfileobj.
166
167 Providing size means exactly that amount of data will be copied (if there
168 isn't enough data, an IOError exception is thrown). Otherwise all data until
169 the EOF marker will be copied.
170 """
171 if size == -1 and hasattr(srcfileobj, 'tell'):
172 if srcfileobj.tell() != 0:
173 raise IOError('partial file but not using size')
174
175 written = 0
176 while written != size:
177 readsize = chunk_size
178 if size > 0:
179 readsize = min(readsize, size-written)
180 data = srcfileobj.read(readsize)
181 if not data:
182 if size == -1:
183 break
184 raise IOError('partial file, got %s, wanted %s' % (written, size))
185 dstfileobj.write(data)
186 written += len(data)
187
188
189def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
190 """Put srcfileobj at the given dstpath with given mode.
191
192 The function aims to do this as efficiently as possible while still allowing
193 any possible file like object be given.
194
195 Creating a tree of hardlinks has a few drawbacks:
196 - tmpfs cannot be used for the scratch space. The tree has to be on the same
197 partition as the cache.
198 - involves a write to the inode, which advances ctime, cause a metadata
199 writeback (causing disk seeking).
200 - cache ctime cannot be used to detect modifications / corruption.
201 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
202 partition. This is why the function automatically fallbacks to copying the
203 file content.
204 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
205 same owner is for all hardlinks.
206 - Anecdotal report that ext2 is known to be potentially faulty on high rate
207 of hardlink creation.
208
209 Creating a tree of symlinks has a few drawbacks:
210 - Tasks running the equivalent of os.path.realpath() will get the naked path
211 and may fail.
212 - Windows:
213 - Symlinks are reparse points:
214 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
215 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
216 - Symbolic links are Win32 paths, not NT paths.
217 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
218 - Symbolic links are supported on Windows 7 and later only.
219 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
220 default.
221 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
222 RID is present in the token;
223 https://msdn.microsoft.com/en-us/library/bb530410.aspx
224 """
225 srcpath = fileobj_path(srcfileobj)
226 if srcpath and size == -1:
227 readonly = file_mode is None or (
228 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
229
230 if readonly:
231 # If the file is read only we can link the file
232 if use_symlink:
233 link_mode = file_path.SYMLINK_WITH_FALLBACK
234 else:
235 link_mode = file_path.HARDLINK_WITH_FALLBACK
236 else:
237 # If not read only, we must copy the file
238 link_mode = file_path.COPY
239
240 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000241 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700242 else:
243 # Need to write out the file
244 with fs.open(dstpath, 'wb') as dstfileobj:
245 fileobj_copy(dstfileobj, srcfileobj, size)
246
Takuto Ikuta523c6472019-09-18 02:53:34 +0000247 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
248 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
249 # early to skip slow/unnecessary chmod call.
250 return
tansell9e04a8d2016-07-28 09:31:59 -0700251
252 # file_mode of 0 is actually valid, so need explicit check.
253 if file_mode is not None:
254 fs.chmod(dstpath, file_mode)
255
256
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000257def zip_compress(content_generator, level=7):
258 """Reads chunks from |content_generator| and yields zip compressed chunks."""
259 compressor = zlib.compressobj(level)
260 for chunk in content_generator:
261 compressed = compressor.compress(chunk)
262 if compressed:
263 yield compressed
264 tail = compressor.flush(zlib.Z_FINISH)
265 if tail:
266 yield tail
267
268
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400269def zip_decompress(
270 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000271 """Reads zipped data from |content_generator| and yields decompressed data.
272
273 Decompresses data in small chunks (no larger than |chunk_size|) so that
274 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
275
276 Raises IOError if data is corrupted or incomplete.
277 """
278 decompressor = zlib.decompressobj()
279 compressed_size = 0
280 try:
281 for chunk in content_generator:
282 compressed_size += len(chunk)
283 data = decompressor.decompress(chunk, chunk_size)
284 if data:
285 yield data
286 while decompressor.unconsumed_tail:
287 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
288 if data:
289 yield data
290 tail = decompressor.flush()
291 if tail:
292 yield tail
293 except zlib.error as e:
294 raise IOError(
295 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
296 # Ensure all data was read and decompressed.
297 if decompressor.unused_data or decompressor.unconsumed_tail:
298 raise IOError('Not all data was decompressed')
299
300
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000301def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000302 """Given a filename calculates the ideal zip compression level to use."""
303 file_ext = os.path.splitext(filename)[1].lower()
304 # TODO(csharp): Profile to find what compression level works best.
305 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
306
307
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000308def create_directories(base_directory, files):
309 """Creates the directory structure needed by the given list of files."""
310 logging.debug('create_directories(%s, %d)', base_directory, len(files))
311 # Creates the tree of directories to create.
312 directories = set(os.path.dirname(f) for f in files)
313 for item in list(directories):
314 while item:
315 directories.add(item)
316 item = os.path.dirname(item)
317 for d in sorted(directories):
318 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700319 abs_d = os.path.join(base_directory, d)
320 if not fs.isdir(abs_d):
321 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000322
323
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000324def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500325 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000326 for filepath, properties in files:
327 if 'l' not in properties:
328 continue
329 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500330 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000331 logging.warning('Ignoring symlink %s', filepath)
332 continue
333 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700334 try:
335 os.symlink(properties['l'], outfile) # pylint: disable=E1101
336 except OSError as e:
337 if e.errno == errno.EEXIST:
338 raise AlreadyExists('File %s already exists.' % outfile)
339 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000340
341
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000342class _ThreadFile(object):
343 """Multithreaded fake file. Used by TarBundle."""
344 def __init__(self):
345 self._data = threading_utils.TaskChannel()
346 self._offset = 0
347
348 def __iter__(self):
349 return self._data
350
351 def tell(self):
352 return self._offset
353
354 def write(self, b):
355 self._data.send_result(b)
356 self._offset += len(b)
357
358 def close(self):
359 self._data.send_done()
360
361
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400362class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800363 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000364
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800365 Its digest and size may be provided in advance, if known. Otherwise they will
366 be derived from the file content.
367 """
368
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000369 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 super(FileItem, self).__init__(
371 digest,
maruel12e30012015-10-09 11:55:35 -0700372 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000373 high_priority,
374 compression_level=_get_zip_compression_level(path))
375 self._path = path
376 self._algo = algo
377 self._meta = None
378
379 @property
380 def path(self):
381 return self._path
382
383 @property
Takuto Ikuta57cb09c2020-04-23 04:10:49 +0000384 def algo(self):
385 return self._algo
386
387 @property
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000388 def digest(self):
389 if not self._digest:
390 self._digest = isolated_format.hash_file(self._path, self._algo)
391 return self._digest
392
393 @property
394 def meta(self):
395 if not self._meta:
396 # TODO(maruel): Inline.
397 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
398 # We need to hash right away.
399 self._meta['h'] = self.digest
400 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000401
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800402 def content(self):
403 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000404
405
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000406class TarBundle(isolate_storage.Item):
407 """Tarfile to push to Storage.
408
409 Its digest is the digest of all the files it contains. It is generated on the
410 fly.
411 """
412
413 def __init__(self, root, algo):
414 # 2 trailing 512 bytes headers.
415 super(TarBundle, self).__init__(size=1024)
416 self._items = []
417 self._meta = None
418 self._algo = algo
419 self._root_len = len(root) + 1
420 # Same value as for Go.
421 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
422 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
423 self._archive_max_size = int(10e6)
424
425 @property
426 def digest(self):
427 if not self._digest:
428 self._prepare()
429 return self._digest
430
431 @property
432 def size(self):
433 if self._size is None:
434 self._prepare()
435 return self._size
436
437 def try_add(self, item):
438 """Try to add this file to the bundle.
439
440 It is extremely naive but this should be just enough for
441 https://crbug.com/825418.
442
443 Future improvements should be in the Go code, and the Swarming bot should be
444 migrated to use the Go code instead.
445 """
446 if not item.size:
447 return False
448 # pylint: disable=unreachable
449 rounded = (item.size + 512) & ~511
450 if rounded + self._size > self._archive_max_size:
451 return False
452 # https://crbug.com/825418
453 return False
454 self._size += rounded
455 self._items.append(item)
456 return True
457
458 def yield_item_path_meta(self):
459 """Returns a tuple(Item, filepath, meta_dict).
460
461 If the bundle contains less than 5 items, the items are yielded.
462 """
463 if len(self._items) < 5:
464 # The tarball is too small, yield individual items, if any.
465 for item in self._items:
466 yield item, item.path[self._root_len:], item.meta
467 else:
468 # This ensures self._meta is set.
469 p = self.digest + '.tar'
470 # Yield itself as a tarball.
471 yield self, p, self._meta
472
473 def content(self):
474 """Generates the tarfile content on the fly."""
475 obj = _ThreadFile()
476 def _tar_thread():
477 try:
478 t = tarfile.open(
479 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
480 for item in self._items:
481 logging.info(' tarring %s', item.path)
482 t.add(item.path)
483 t.close()
484 except Exception:
485 logging.exception('Internal failure')
486 finally:
487 obj.close()
488
489 t = threading.Thread(target=_tar_thread)
490 t.start()
491 try:
492 for data in obj:
493 yield data
494 finally:
495 t.join()
496
497 def _prepare(self):
498 h = self._algo()
499 total = 0
500 for chunk in self.content():
501 h.update(chunk)
502 total += len(chunk)
503 # pylint: disable=attribute-defined-outside-init
504 # This is not true, they are defined in Item.__init__().
505 self._digest = h.hexdigest()
506 self._size = total
507 self._meta = {
508 'h': self.digest,
509 's': self.size,
510 't': u'tar',
511 }
512
513
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400514class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000515 """A byte buffer to push to Storage."""
516
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000517 def __init__(self, buf, algo, high_priority=False):
518 super(BufferItem, self).__init__(
519 digest=algo(buf).hexdigest(),
520 size=len(buf),
521 high_priority=high_priority)
522 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000523
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800524 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000525 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000526
527
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000528class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000530
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531 Implements compression support, parallel 'contains' checks, parallel uploads
532 and more.
533
534 Works only within single namespace (and thus hashing algorithm and compression
535 scheme are fixed).
536
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400537 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
538 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 """
540
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700541 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000542 self._storage_api = storage_api
543 self._cpu_thread_pool = None
544 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400545 self._aborted = False
546 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000547
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000548 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000549 def server_ref(self):
550 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700551
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000552 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700553 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000554 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700555
556 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000557 def cpu_thread_pool(self):
558 """ThreadPool for CPU-bound tasks like zipping."""
559 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500560 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000561 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
562 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500563 # On 32 bits userland, do not try to use more than 16 threads.
564 threads = min(threads, 16)
565 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000566 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000567
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000568 @property
569 def net_thread_pool(self):
570 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
571 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700572 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000573 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000574
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000575 def close(self):
576 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400577 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000578 if self._cpu_thread_pool:
579 self._cpu_thread_pool.join()
580 self._cpu_thread_pool.close()
581 self._cpu_thread_pool = None
582 if self._net_thread_pool:
583 self._net_thread_pool.join()
584 self._net_thread_pool.close()
585 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400586 logging.info('Done.')
587
588 def abort(self):
589 """Cancels any pending or future operations."""
590 # This is not strictly theadsafe, but in the worst case the logging message
591 # will be printed twice. Not a big deal. In other places it is assumed that
592 # unprotected reads and writes to _aborted are serializable (it is true
593 # for python) and thus no locking is used.
594 if not self._aborted:
595 logging.warning('Aborting... It can take a while.')
596 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000597
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 def __enter__(self):
599 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400600 assert not self._prev_sig_handlers, self._prev_sig_handlers
601 for s in (signal.SIGINT, signal.SIGTERM):
602 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000603 return self
604
605 def __exit__(self, _exc_type, _exc_value, _traceback):
606 """Context manager interface."""
607 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400608 while self._prev_sig_handlers:
609 s, h = self._prev_sig_handlers.popitem()
610 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000611 return False
612
Takuto Ikuta26980872020-04-09 06:56:37 +0000613 def upload_items(self, items, verify_push=False):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000614 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000615
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000617
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000618 It uses 3 threads internally:
619 - One to create batches based on a timeout
620 - One to dispatch the /contains RPC and field the missing entries
621 - One to field the /push RPC
622
623 The main threads enumerates 'items' and pushes to the first thread. Then it
624 join() all the threads, waiting for them to complete.
625
626 (enumerate items of Item, this can be slow as disk is traversed)
627 |
628 v
629 _create_items_batches_thread Thread #1
630 (generates list(Item), every 3s or 20~100 items)
631 |
632 v
633 _do_lookups_thread Thread #2
634 | |
635 v v
636 (missing) (was on server)
637 |
638 v
639 _handle_missing_thread Thread #3
640 |
641 v
642 (upload Item, append to uploaded)
643
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000644 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400645 items: list of isolate_storage.Item instances that represents data to
646 upload.
Takuto Ikuta26980872020-04-09 06:56:37 +0000647 verify_push: verify files are uploaded correctly by fetching from server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000648
649 Returns:
650 List of items that were uploaded. All other items are already there.
Ye Kuang4e994292020-04-10 07:07:35 +0000651
652 Raises:
653 The first exception being raised in the worker threads.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000654 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000655 incoming = Queue.Queue()
656 batches_to_lookup = Queue.Queue()
657 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000658 uploaded = []
Ye Kuang4e994292020-04-10 07:07:35 +0000659 exc_channel = threading_utils.TaskChannel()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800660
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000661 def _create_items_batches_thread():
662 """Creates batches for /contains RPC lookup from individual items.
663
664 Input: incoming
665 Output: batches_to_lookup
666 """
667 try:
668 batch_size_index = 0
669 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
670 batch = []
671 while not self._aborted:
672 try:
673 item = incoming.get(True, timeout=3)
674 if item:
675 batch.append(item)
676 except Queue.Empty:
677 item = False
678 if len(batch) == batch_size or (not item and batch):
679 if len(batch) == batch_size:
680 batch_size_index += 1
681 batch_size = ITEMS_PER_CONTAINS_QUERIES[
682 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
683 batches_to_lookup.put(batch)
684 batch = []
685 if item is None:
686 break
Ye Kuang4e994292020-04-10 07:07:35 +0000687 except Exception:
688 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000689 finally:
690 # Unblock the next pipeline.
691 batches_to_lookup.put(None)
692
693 def _do_lookups_thread():
694 """Enqueues all the /contains RPCs and emits the missing items.
695
696 Input: batches_to_lookup
697 Output: missing, to_upload
698 """
699 try:
700 channel = threading_utils.TaskChannel()
701 def _contains(b):
702 if self._aborted:
703 raise Aborted()
704 return self._storage_api.contains(b)
705
706 pending_contains = 0
707 while not self._aborted:
708 batch = batches_to_lookup.get()
709 if batch is None:
710 break
711 self.net_thread_pool.add_task_with_channel(
712 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
713 pending_contains += 1
714 while pending_contains and not self._aborted:
715 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000716 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000717 except threading_utils.TaskChannel.Timeout:
718 break
719 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000720 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000721 missing.put((missing_item, push_state))
722 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000723 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000724 missing.put((missing_item, push_state))
725 pending_contains -= 1
Ye Kuang4e994292020-04-10 07:07:35 +0000726 except Exception:
727 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000728 finally:
729 # Unblock the next pipeline.
730 missing.put((None, None))
731
732 def _handle_missing_thread():
733 """Sends the missing items to the uploader.
734
735 Input: missing
736 Output: uploaded
737 """
Ye Kuang4e994292020-04-10 07:07:35 +0000738 try:
739 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
740 channel = threading_utils.TaskChannel()
741 pending_upload = 0
742 while not self._aborted:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000743 try:
Ye Kuang4e994292020-04-10 07:07:35 +0000744 missing_item, push_state = missing.get(True, timeout=5)
745 if missing_item is None:
746 break
747 self._async_push(channel, missing_item, push_state, verify_push)
748 pending_upload += 1
749 except Queue.Empty:
750 pass
751 detector.ping()
752 while not self._aborted and pending_upload:
753 try:
754 item = channel.next(timeout=0)
755 except threading_utils.TaskChannel.Timeout:
756 break
757 uploaded.append(item)
758 pending_upload -= 1
759 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
760 pending_upload, item.digest, item.size)
761 while not self._aborted and pending_upload:
762 item = channel.next()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000763 uploaded.append(item)
764 pending_upload -= 1
765 logging.debug(
766 'Uploaded %d; %d pending: %s (%d)',
767 len(uploaded), pending_upload, item.digest, item.size)
Ye Kuang4e994292020-04-10 07:07:35 +0000768 except Exception:
769 exc_channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000770
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000771 threads = [
772 threading.Thread(target=_create_items_batches_thread),
773 threading.Thread(target=_do_lookups_thread),
774 threading.Thread(target=_handle_missing_thread),
775 ]
776 for t in threads:
777 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000778
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000779 try:
780 # For each digest keep only first isolate_storage.Item that matches it.
781 # All other items are just indistinguishable copies from the point of view
782 # of isolate server (it doesn't care about paths at all, only content and
783 # digests).
784 seen = {}
785 try:
786 # TODO(maruel): Reorder the items as a priority queue, with larger items
787 # being processed first. This is, before hashing the data.
788 # This must be done in the primary thread since items can be a
789 # generator.
790 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000791 if seen.setdefault(item.digest, item) is item:
792 incoming.put(item)
793 finally:
794 incoming.put(None)
795 finally:
796 for t in threads:
797 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +0000798 exc_channel.send_done()
799 for _ in exc_channel:
800 # If there is no exception, this loop does nothing. Otherwise, it raises
801 # the first exception put onto |exc_channel|.
802 pass
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000803
804 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000805 if seen:
806 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000807 return uploaded
808
Takuto Ikuta26980872020-04-09 06:56:37 +0000809 def _async_push(self, channel, item, push_state, verify_push=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000810 """Starts asynchronous push to the server in a parallel thread.
811
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000812 Can be used only after |item| was checked for presence on a server with a
813 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800814
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000816 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400817 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000818 push_state: push state returned by storage_api.contains(). It contains
819 storage specific information describing how to upload the item (for
820 example in case of cloud storage, it is signed upload URLs).
Takuto Ikuta26980872020-04-09 06:56:37 +0000821 verify_push: verify files are uploaded correctly by fetching from server.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800822
823 Returns:
824 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000825 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800826 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400827 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700828 threading_utils.PRIORITY_HIGH if item.high_priority
829 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800830
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000831 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400832 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400833 if self._aborted:
834 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800835 self._storage_api.push(item, push_state, content)
Takuto Ikuta26980872020-04-09 06:56:37 +0000836 if verify_push:
Takuto Ikutac01532c2020-04-21 07:56:54 +0000837 try:
838 self._fetch(
839 item.digest,
840 item.size,
841 # this consumes all elements from given generator.
842 lambda gen: collections.deque(gen, maxlen=0))
843 except Exception:
844 # reset push_state if failed to verify.
845 push_state.finalized = False
846 push_state.uploaded = False
847 raise
848
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000849 return item
850
Wei Huang1a38fbe2017-11-28 22:55:22 -0500851 # If zipping is not required, just start a push task. Don't pass 'content'
852 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000853 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000854 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000855 return
856
857 # If zipping is enabled, zip in a separate thread.
858 def zip_and_push():
859 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
860 # content right here. It will block until all file is zipped.
861 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400862 if self._aborted:
863 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800864 stream = zip_compress(item.content(), item.compression_level)
Lei Lei73a5f732020-03-23 20:36:14 +0000865 # In Python3, zlib.compress returns a byte object instead of str.
866 data = six.b('').join(stream)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000867 except Exception as exc:
868 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800869 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000870 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500871 # Pass '[data]' explicitly because the compressed data is not same as the
872 # one provided by 'item'. Since '[data]' is a list, it can safely be
873 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000874 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000875 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000876 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000877
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 def push(self, item, push_state):
879 """Synchronously pushes a single item to the server.
880
881 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000882 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800883
884 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400885 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000886 push_state: push state returned by storage_api.contains(). It contains
887 storage specific information describing how to upload the item (for
888 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800889
890 Returns:
891 Pushed item (same object as |item|).
892 """
893 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700894 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000895 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000896 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800897 assert pushed is item
898 return item
899
Takuto Ikuta26980872020-04-09 06:56:37 +0000900 def _fetch(self, digest, size, sink):
901 try:
902 # Prepare reading pipeline.
903 stream = self._storage_api.fetch(digest, size, 0)
904 if self.server_ref.is_with_compression:
905 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
906 # Run |stream| through verifier that will assert its size.
907 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
908 size)
909 # Verified stream goes to |sink|.
910 sink(verifier.run())
Takuto Ikuta98dcc372020-04-20 09:09:28 +0000911 except Exception:
912 logging.exception('Failed to fetch %s', digest)
Takuto Ikuta26980872020-04-09 06:56:37 +0000913 raise
914
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000915 def async_fetch(self, channel, priority, digest, size, sink):
916 """Starts asynchronous fetch from the server in a parallel thread.
917
918 Arguments:
919 channel: TaskChannel that receives back |digest| when download ends.
920 priority: thread pool task priority for the fetch.
921 digest: hex digest of an item to download.
922 size: expected size of the item (after decompression).
923 sink: function that will be called as sink(generator).
924 """
925 def fetch():
Takuto Ikuta26980872020-04-09 06:56:37 +0000926 self._fetch(digest, size, sink)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000927 return digest
928
929 # Don't bother with zip_thread_pool for decompression. Decompression is
930 # really fast and most probably IO bound anyway.
931 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
932
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000933
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000934class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400935 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000936
937 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400938 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
939 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940 """
941
942 def __init__(self, storage, cache):
943 self.storage = storage
944 self.cache = cache
945 self._channel = threading_utils.TaskChannel()
946 self._pending = set()
947 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000948 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400949 # Pending digests that the caller waits for, see wait_on()/wait().
950 self._waiting_on = set()
951 # Already fetched digests the caller waits for which are not yet returned by
952 # wait().
953 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000954
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400955 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700956 self,
957 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400958 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700959 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000960 """Starts asynchronous fetch of item |digest|."""
961 # Fetching it now?
962 if digest in self._pending:
963 return
964
965 # Mark this file as in use, verify_all_cached will later ensure it is still
966 # in cache.
967 self._accessed.add(digest)
968
969 # Already fetched? Notify cache to update item's LRU position.
970 if digest in self._fetched:
971 # 'touch' returns True if item is in cache and not corrupted.
972 if self.cache.touch(digest, size):
973 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000974 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000975 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000976
977 # TODO(maruel): It should look at the free disk space, the current cache
978 # size and the size of the new item on every new item:
979 # - Trim the cache as more entries are listed when free disk space is low,
980 # otherwise if the amount of data downloaded during the run > free disk
981 # space, it'll crash.
982 # - Make sure there's enough free disk space to fit all dependencies of
983 # this run! If not, abort early.
984
985 # Start fetching.
986 self._pending.add(digest)
987 self.storage.async_fetch(
988 self._channel, priority, digest, size,
989 functools.partial(self.cache.write, digest))
990
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400991 def wait_on(self, digest):
992 """Updates digests to be waited on by 'wait'."""
993 # Calculate once the already fetched items. These will be retrieved first.
994 if digest in self._fetched:
995 self._waiting_on_ready.add(digest)
996 else:
997 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000998
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400999 def wait(self):
1000 """Waits until any of waited-on items is retrieved.
1001
1002 Once this happens, it is remove from the waited-on set and returned.
1003
1004 This function is called in two waves. The first wave it is done for HIGH
1005 priority items, the isolated files themselves. The second wave it is called
1006 for all the files.
1007
1008 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001009 """
1010 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001011 if self._waiting_on_ready:
1012 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001013
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001014 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001016 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001017 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001018 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001019 self._pending.remove(digest)
1020 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001021 if digest in self._waiting_on:
1022 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001023 return digest
1024
1025 # Should never reach this point due to assert above.
1026 raise RuntimeError('Impossible state')
1027
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001028 @property
1029 def wait_queue_empty(self):
1030 """Returns True if there is no digest left for wait() to return."""
1031 return not self._waiting_on and not self._waiting_on_ready
1032
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001033 def inject_local_file(self, path, algo):
1034 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -07001035 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001036 data = f.read()
1037 digest = algo(data).hexdigest()
1038 self.cache.write(digest, [data])
1039 self._fetched.add(digest)
1040 return digest
1041
1042 @property
1043 def pending_count(self):
1044 """Returns number of items to be fetched."""
1045 return len(self._pending)
1046
1047 def verify_all_cached(self):
1048 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001049 # Not thread safe, but called after all work is done.
1050 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001051
1052
1053class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001054 """Verifies that fetched file is valid before passing it to the
1055 ContentAddressedCache.
1056 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001057
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001058 def __init__(self, stream, hasher, expected_digest, expected_size):
1059 """Initializes the verifier.
1060
1061 Arguments:
1062 * stream: an iterable yielding chunks of content
1063 * hasher: an object from hashlib that supports update() and hexdigest()
1064 (eg, hashlib.sha1).
1065 * expected_digest: if the entire stream is piped through hasher and then
1066 summarized via hexdigest(), this should be the result. That is, it
1067 should be a hex string like 'abc123'.
1068 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001069 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001070 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001071 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001072 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001073 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001074 self.expected_size = expected_size
1075 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001076 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001077
1078 def run(self):
1079 """Generator that yields same items as |stream|.
1080
1081 Verifies |stream| is complete before yielding a last chunk to consumer.
1082
1083 Also wraps IOError produced by consumer into MappingError exceptions since
1084 otherwise Storage will retry fetch on unrelated local cache errors.
1085 """
1086 # Read one chunk ahead, keep it in |stored|.
1087 # That way a complete stream can be verified before pushing last chunk
1088 # to consumer.
1089 stored = None
1090 for chunk in self.stream:
1091 assert chunk is not None
1092 if stored is not None:
1093 self._inspect_chunk(stored, is_last=False)
1094 try:
1095 yield stored
1096 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001097 raise isolated_format.MappingError(
1098 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001099 stored = chunk
1100 if stored is not None:
1101 self._inspect_chunk(stored, is_last=True)
1102 try:
1103 yield stored
1104 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001105 raise isolated_format.MappingError(
1106 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001107
1108 def _inspect_chunk(self, chunk, is_last):
1109 """Called for each fetched chunk before passing it to consumer."""
1110 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001111 self.rolling_hash.update(chunk)
1112 if not is_last:
1113 return
1114
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001115 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001116 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001117 msg = 'Incorrect file size: want %d, got %d' % (
1118 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001119 raise IOError(msg)
1120
1121 actual_digest = self.rolling_hash.hexdigest()
1122 if self.expected_digest != actual_digest:
1123 msg = 'Incorrect digest: want %s, got %s' % (
1124 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001125 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001126
1127
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001128class IsolatedBundle(object):
1129 """Fetched and parsed .isolated file with all dependencies."""
1130
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001131 def __init__(self, filter_cb):
1132 """
1133 filter_cb: callback function to filter downloaded content.
1134 When filter_cb is not None, Isolated file is downloaded iff
1135 filter_cb(filepath) returns True.
1136 """
1137
Vadim Shtayura3148e072014-09-02 18:51:52 -07001138 self.command = []
1139 self.files = {}
1140 self.read_only = None
1141 self.relative_cwd = None
1142 # The main .isolated file, a IsolatedFile instance.
1143 self.root = None
1144
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001145 self._filter_cb = filter_cb
1146
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001147 def fetch(self, fetch_queue, root_isolated_hash, algo):
1148 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001149
1150 It enables support for "included" .isolated files. They are processed in
1151 strict order but fetched asynchronously from the cache. This is important so
1152 that a file in an included .isolated file that is overridden by an embedding
1153 .isolated file is not fetched needlessly. The includes are fetched in one
1154 pass and the files are fetched as soon as all the ones on the left-side
1155 of the tree were fetched.
1156
1157 The prioritization is very important here for nested .isolated files.
1158 'includes' have the highest priority and the algorithm is optimized for both
1159 deep and wide trees. A deep one is a long link of .isolated files referenced
1160 one at a time by one item in 'includes'. A wide one has a large number of
1161 'includes' in a single .isolated file. 'left' is defined as an included
1162 .isolated file earlier in the 'includes' list. So the order of the elements
1163 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001164
1165 As a side effect this method starts asynchronous fetch of all data files
1166 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1167 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001168 """
1169 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1170
1171 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1172 pending = {}
1173 # Set of hashes of already retrieved items to refuse recursive includes.
1174 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001175 # Set of IsolatedFile's whose data files have already being fetched.
1176 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001177
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001178 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001179 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001180 h = isolated_file.obj_hash
1181 if h in seen:
1182 raise isolated_format.IsolatedError(
1183 'IsolatedFile %s is retrieved recursively' % h)
1184 assert h not in pending
1185 seen.add(h)
1186 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001187 # This isolated item is being added dynamically, notify FetchQueue.
1188 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001189 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1190
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001191 # Start fetching root *.isolated file (single file, not the whole bundle).
1192 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001193
1194 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001195 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001196 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001197 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001198 with fetch_queue.cache.getfileobj(item_hash) as f:
1199 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001200
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001201 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001202 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001203 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001204
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001205 # Always fetch *.isolated files in traversal order, waiting if necessary
1206 # until next to-be-processed node loads. "Waiting" is done by yielding
1207 # back to the outer loop, that waits until some *.isolated is loaded.
1208 for node in isolated_format.walk_includes(self.root):
1209 if node not in processed:
1210 # Not visited, and not yet loaded -> wait for it to load.
1211 if not node.is_loaded:
1212 break
1213 # Not visited and loaded -> process it and continue the traversal.
1214 self._start_fetching_files(node, fetch_queue)
1215 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001216
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001217 # All *.isolated files should be processed by now and only them.
1218 all_isolateds = set(isolated_format.walk_includes(self.root))
1219 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001220 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001221
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001222 # Extract 'command' and other bundle properties.
1223 for node in isolated_format.walk_includes(self.root):
1224 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001225 self.relative_cwd = self.relative_cwd or ''
1226
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001227 def _start_fetching_files(self, isolated, fetch_queue):
1228 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001229
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001230 Modifies self.files.
1231 """
maruel10bea7b2016-12-07 05:03:49 -08001232 files = isolated.data.get('files', {})
1233 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001234 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001235 if self._filter_cb and not self._filter_cb(filepath):
1236 continue
1237
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001238 # Root isolated has priority on the files being mapped. In particular,
1239 # overridden files must not be fetched.
1240 if filepath not in self.files:
1241 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001242
1243 # Make sure if the isolated is read only, the mode doesn't have write
1244 # bits.
1245 if 'm' in properties and self.read_only:
1246 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1247
1248 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001249 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001250 fetch_queue.add(
1251 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1252
1253 def _update_self(self, node):
1254 """Extracts bundle global parameters from loaded *.isolated file.
1255
1256 Will be called with each loaded *.isolated file in order of traversal of
1257 isolated include graph (see isolated_format.walk_includes).
1258 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001259 # Grabs properties.
1260 if not self.command and node.data.get('command'):
1261 # Ensure paths are correctly separated on windows.
1262 self.command = node.data['command']
1263 if self.command:
1264 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001265 if self.read_only is None and node.data.get('read_only') is not None:
1266 self.read_only = node.data['read_only']
1267 if (self.relative_cwd is None and
1268 node.data.get('relative_cwd') is not None):
1269 self.relative_cwd = node.data['relative_cwd']
1270
1271
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001272def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001273 """Returns Storage class that can upload and download from |namespace|.
1274
1275 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001276 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001277
1278 Returns:
1279 Instance of Storage.
1280 """
Lei Lei73a5f732020-03-23 20:36:14 +00001281 # Handle the specific internal use case.
1282 assert (isinstance(server_ref, isolate_storage.ServerRef) or
1283 type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001284 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001285
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001286
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001287def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1288 """Put downloaded file to destination path. This function is used for multi
1289 threaded file putting.
1290 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001291 with tools.Profiler("_map_file for %s" % dst):
1292 with cache.getfileobj(digest) as srcfileobj:
1293 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001294
Takuto Ikuta523c6472019-09-18 02:53:34 +00001295 if filetype == 'basic':
1296 # Ignore all bits apart from the user.
1297 file_mode = (props.get('m') or 0o500) & 0o700
1298 if read_only:
1299 # Enforce read-only if the root bundle does.
1300 file_mode &= 0o500
1301 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001302
Takuto Ikuta523c6472019-09-18 02:53:34 +00001303 elif filetype == 'tar':
1304 basedir = os.path.dirname(dst)
1305 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1306 ensured_dirs = set()
1307 for ti in t:
1308 if not ti.isfile():
1309 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1310 ti.type)
1311 continue
1312 # Handle files created on Windows fetched on POSIX and the
1313 # reverse.
1314 other_sep = '/' if os.path.sep == '\\' else '\\'
1315 name = ti.name.replace(other_sep, os.path.sep)
1316 fp = os.path.normpath(os.path.join(basedir, name))
1317 if not fp.startswith(basedir):
1318 logging.error('Path(%r) is outside root directory', fp)
1319 ifd = t.extractfile(ti)
1320 fp_dir = os.path.dirname(fp)
1321 if fp_dir not in ensured_dirs:
1322 file_path.ensure_tree(fp_dir)
1323 ensured_dirs.add(fp_dir)
1324 file_mode = ti.mode & 0o700
1325 if read_only:
1326 # Enforce read-only if the root bundle does.
1327 file_mode &= 0o500
1328 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001329
Takuto Ikuta523c6472019-09-18 02:53:34 +00001330 else:
1331 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001332
1333
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001334def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1335 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001336 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001337
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001338 Arguments:
1339 isolated_hash: hash of the root *.isolated file.
1340 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001341 cache: ContentAddressedCache class that knows how to store and map files
1342 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001343 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001344 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001345 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001346
1347 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001348 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001349 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001350 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001351 'fetch_isolated(%s, %s, %s, %s, %s)',
1352 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001353 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001354 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001355 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001356 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001357
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001358 with tools.Profiler('GetIsolateds'):
1359 # Optionally support local files by manually adding them to cache.
1360 if not isolated_format.is_valid_hash(isolated_hash, algo):
Junji Watanabe38b28b02020-04-23 10:23:30 +00001361 logging.debug(
1362 '%s is not a valid hash, assuming a file '
1363 '(algo was %s, hash size was %d)', isolated_hash, algo(),
1364 algo().digest_size)
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001365 path = six.text_type(os.path.abspath(isolated_hash))
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001366 try:
1367 isolated_hash = fetch_queue.inject_local_file(path, algo)
1368 except IOError as e:
1369 raise isolated_format.MappingError(
1370 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1371 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001372
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001373 # Load all *.isolated and start loading rest of the files.
1374 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001375
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001376 with tools.Profiler('GetRest'):
1377 # Create file system hierarchy.
1378 file_path.ensure_tree(outdir)
1379 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001380 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001381
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001382 # Ensure working directory exists.
1383 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1384 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001385
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001386 # Multimap: digest -> list of pairs (path, props).
1387 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001388 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001389 if 'h' in props:
1390 remaining.setdefault(props['h'], []).append((filepath, props))
1391 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001392
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001393 # Now block on the remaining files to be downloaded and mapped.
1394 logging.info('Retrieving remaining files (%d of them)...',
1395 fetch_queue.pending_count)
1396 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001397
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001398 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1399 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1400 while remaining:
1401 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001402
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001403 # Wait for any item to finish fetching to cache.
1404 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001405
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001406 # Create the files in the destination using item in cache as the
1407 # source.
1408 for filepath, props in remaining.pop(digest):
1409 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001410
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001411 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1412 _map_file, fullpath, digest,
1413 props, cache, bundle.read_only,
1414 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001415
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001416 # Report progress.
1417 duration = time.time() - last_update
1418 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1419 msg = '%d files remaining...' % len(remaining)
1420 sys.stdout.write(msg + '\n')
1421 sys.stdout.flush()
1422 logging.info(msg)
1423 last_update = time.time()
1424 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1425 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001426
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001427 # Save the cache right away to not loose the state of the new objects.
1428 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001429 # Cache could evict some items we just tried to fetch, it's a fatal error.
1430 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001431 free_disk = file_path.get_free_space(cache.cache_dir)
1432 msg = (
1433 'Cache is too small to hold all requested files.\n'
1434 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001435 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001436 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001437 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001438
1439
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001440def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001441 """Yields every file and/or symlink found.
1442
1443 Yields:
1444 tuple(FileItem, relpath, metadata)
1445 For a symlink, FileItem is None.
1446 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001447 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001448 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001449 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001450 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001451 root,
1452 u'.' + os.path.sep,
1453 blacklist,
1454 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001455
1456 filepath = os.path.join(root, relpath)
1457 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001458 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001459 meta = isolated_format.file_to_metadata(filepath, 0, False)
1460 yield None, relpath, meta
1461 continue
1462
1463 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001464 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1465 # The file was added to the current pending tarball and won't be archived
1466 # individually.
1467 continue
1468
1469 # Flush and reset the bundle.
1470 for i, p, m in bundle.yield_item_path_meta():
1471 yield i, p, m
1472 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001473
1474 # Yield the file individually.
1475 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1476 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001477
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001478 for i, p, m in bundle.yield_item_path_meta():
1479 yield i, p, m
1480
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001481
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001482def _print_upload_stats(items, missing):
1483 """Prints upload stats."""
1484 total = len(items)
1485 total_size = sum(f.size for f in items)
1486 logging.info(
1487 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1488 cache_hit = set(items).difference(missing)
1489 cache_hit_size = sum(f.size for f in cache_hit)
1490 logging.info(
1491 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1492 len(cache_hit),
1493 cache_hit_size / 1024.,
1494 len(cache_hit) * 100. / total,
1495 cache_hit_size * 100. / total_size if total_size else 0)
1496 cache_miss = missing
1497 cache_miss_size = sum(f.size for f in cache_miss)
Junji Watanabe38b28b02020-04-23 10:23:30 +00001498 logging.info('cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1499 len(cache_miss), cache_miss_size / 1024.,
1500 len(cache_miss) * 100. / total,
1501 cache_miss_size * 100. / total_size if total_size else 0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001502
1503
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001504def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001505 """Called by archive_files_to_storage for a directory.
1506
1507 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001508
1509 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001510 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001511 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001512 files = {}
1513 for item, relpath, meta in _directory_to_metadata(
1514 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001515 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001516 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001517 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001518 yield item
1519
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001520 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001521 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001522 'algo': hash_algo_name,
1523 'files': files,
1524 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001525 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001526 # Keep the file in memory. This is fine because .isolated files are relatively
1527 # small.
1528 yield BufferItem(
1529 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001530
1531
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001532def _archive_files_to_storage_internal(storage,
1533 files,
1534 blacklist,
1535 verify_push=False):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001536 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001537
1538 Arguments:
1539 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001540 files: iterable of files to upload. If a directory is specified (with a
1541 trailing slash), a .isolated file is created and its hash is returned.
1542 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001543 blacklist: function that returns True if a file should be omitted.
Takuto Ikuta26980872020-04-09 06:56:37 +00001544 verify_push: verify files are uploaded correctly by fetching from server.
maruel064c0a32016-04-05 11:47:15 -07001545
1546 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001547 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001548 The first file in the first item is always the .isolated file.
Ye Kuang4e994292020-04-10 07:07:35 +00001549
1550 Raises:
1551 Re-raises the exception in upload_items(), if there is any.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001552 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001553 # Dict of path to hash.
1554 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001555 hash_algo = storage.server_ref.hash_algo
1556 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001557 # Generator of FileItem to pass to upload_items() concurrent operation.
1558 channel = threading_utils.TaskChannel()
Ye Kuang4e994292020-04-10 07:07:35 +00001559 exc_channel = threading_utils.TaskChannel()
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001560 uploaded_digests = set()
Ye Kuang4e994292020-04-10 07:07:35 +00001561
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001562 def _upload_items():
Ye Kuang4e994292020-04-10 07:07:35 +00001563 try:
1564 results = storage.upload_items(channel, verify_push)
1565 uploaded_digests.update(f.digest for f in results)
1566 except Exception:
1567 exc_channel.send_exception()
1568
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001569 t = threading.Thread(target=_upload_items)
1570 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001571
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001572 # Keep track locally of the items to determine cold and hot items.
1573 items_found = []
1574 try:
1575 for f in files:
Takuto Ikuta95459dd2019-10-29 12:39:47 +00001576 assert isinstance(f, six.text_type), repr(f)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001577 if f in results:
1578 # Duplicate
1579 continue
1580 try:
1581 filepath = os.path.abspath(f)
1582 if fs.isdir(filepath):
1583 # Uploading a whole directory.
1584 item = None
1585 for item in _enqueue_dir(
1586 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001587 channel.send_result(item)
1588 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001589 # The very last item will be the .isolated file.
1590 if not item:
1591 # There was no file in the directory.
1592 continue
1593 elif fs.isfile(filepath):
1594 item = FileItem(
1595 path=filepath,
1596 algo=hash_algo,
1597 size=None,
1598 high_priority=f.endswith('.isolated'))
1599 channel.send_result(item)
1600 items_found.append(item)
1601 else:
1602 raise Error('%s is neither a file or directory.' % f)
1603 results[f] = item.digest
1604 except OSError:
1605 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001606 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001607 # Stops the generator, so _upload_items() can exit.
1608 channel.send_done()
1609 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +00001610 exc_channel.send_done()
Takuto Ikuta640fcbf2020-04-23 00:12:46 +00001611
1612 try:
1613 for _ in exc_channel:
1614 pass
1615 except Exception:
1616 # log items when failed to upload files.
1617 for item in items_found:
1618 if isinstance(item, FileItem):
1619 logging.error('FileItem path: %s, digest:%s, re-calculated digest:%s',
1620 item.path, item.digest,
1621 isolated_format.hash_file(item.path, item.algo))
1622 continue
1623
1624 logging.error('Item digest:%s', item.digest)
1625
1626 raise
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001627
1628 cold = []
1629 hot = []
1630 for i in items_found:
1631 # Note that multiple FileItem may have the same .digest.
1632 if i.digest in uploaded_digests:
1633 cold.append(i)
1634 else:
1635 hot.append(i)
1636 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001637
1638
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001639# TODO(crbug.com/1073832):
1640# remove this if process leak in coverage build was fixed.
1641def archive_files_to_storage(storage, files, blacklist, verify_push=False):
1642 """Calls _archive_files_to_storage_internal with retry.
1643
1644 Arguments:
1645 See Arguments section in _archive_files_to_storage_internal
1646
1647 Returns:
1648 See Returns section in _archive_files_to_storage_internal
1649
1650 Raises:
1651 Re-raises the exception in _archive_files_to_storage_internal if all retry
1652 failed.
1653 """
1654
1655 # Will do exponential backoff.
1656 # e.g. 10, 20, 40, 80
1657 backoff = 10
1658
1659 while True:
1660 try:
1661 return _archive_files_to_storage_internal(storage, files, blacklist,
1662 verify_push)
1663 except Exception:
1664 if backoff > 100:
1665 raise
1666
Takuto Ikutaf98be312020-05-08 10:22:54 +00001667 on_error.report('error before %d second backoff' % backoff)
1668
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001669 logging.exception(
1670 'failed to run _archive_files_to_storage_internal,'
1671 ' will retry after %d seconds', backoff)
1672 time.sleep(backoff)
1673 backoff *= 2
1674
1675
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001676@subcommand.usage('<file1..fileN> or - to read from stdin')
1677def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001678 """Archives data to the server.
1679
1680 If a directory is specified, a .isolated file is created the whole directory
1681 is uploaded. Then this .isolated file can be included in another one to run
1682 commands.
1683
1684 The commands output each file that was processed with its content hash. For
1685 directories, the .isolated generated for the directory is listed as the
1686 directory entry itself.
1687 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001688 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001689 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001690 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001691 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001692 server_ref = isolate_storage.ServerRef(
1693 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001694 if files == ['-']:
1695 files = (l.rstrip('\n\r') for l in sys.stdin)
1696 if not files:
1697 parser.error('Nothing to upload')
1698 files = (f.decode('utf-8') for f in files)
1699 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001700 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001701 with get_storage(server_ref) as storage:
1702 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001703 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001704 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001705 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001706 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001707
1708
1709def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001710 """Download data from the server.
1711
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001712 It can either download individual files or a complete tree from a .isolated
1713 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001714 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001715 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001716 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001717 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001718 help='hash of an isolated file, .isolated file content is discarded, use '
1719 '--file if you need it')
1720 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001721 '-f',
1722 '--file',
1723 metavar='HASH DEST',
1724 default=[],
1725 action='append',
1726 nargs=2,
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001727 help='hash and destination of a file, can be used multiple times')
1728 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001729 '-t',
1730 '--target',
1731 metavar='DIR',
1732 default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001733 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001734 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001735 '--use-symlinks',
1736 action='store_true',
maruel4409e302016-07-19 14:25:51 -07001737 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001738 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001739 options, args = parser.parse_args(args)
1740 if args:
1741 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001742 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001743 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001744
nodir55be77b2016-05-03 09:39:57 -07001745 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001746 if bool(options.isolated) == bool(options.file):
1747 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001748 if not options.cache and options.use_symlinks:
1749 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001750
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001751 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001752 cache.cleanup()
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001753 options.target = six.text_type(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001754 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001755 if (fs.isfile(options.target) or
1756 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001757 parser.error(
1758 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001759 server_ref = isolate_storage.ServerRef(
1760 options.isolate_server, options.namespace)
1761 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001762 # Fetching individual files.
1763 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001764 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001765 channel = threading_utils.TaskChannel()
1766 pending = {}
1767 for digest, dest in options.file:
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001768 dest = six.text_type(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001769 pending[digest] = dest
1770 storage.async_fetch(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001771 channel, threading_utils.PRIORITY_MED, digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001772 local_caching.UNKNOWN_FILE_SIZE,
Junji Watanabe38b28b02020-04-23 10:23:30 +00001773 functools.partial(local_caching.file_write,
1774 os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001775 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001776 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001777 dest = pending.pop(fetched)
1778 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001779
Vadim Shtayura3172be52013-12-03 12:49:05 -08001780 # Fetching whole isolated tree.
1781 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001782 bundle = fetch_isolated(
1783 isolated_hash=options.isolated,
1784 storage=storage,
1785 cache=cache,
1786 outdir=options.target,
1787 use_symlinks=options.use_symlinks)
1788 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001789 if bundle.command:
1790 rel = os.path.join(options.target, bundle.relative_cwd)
1791 print('To run this test please run from the directory %s:' %
1792 os.path.join(options.target, rel))
1793 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001794
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001795 return 0
1796
1797
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001798def add_archive_options(parser):
1799 parser.add_option(
1800 '--blacklist',
1801 action='append', default=list(DEFAULT_BLACKLIST),
1802 help='List of regexp to use as blacklist filter when uploading '
1803 'directories')
1804
1805
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001806def add_isolate_server_options(parser):
1807 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001808 parser.add_option(
1809 '-I', '--isolate-server',
1810 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001811 help='URL of the Isolate Server to use. Defaults to the environment '
1812 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1813 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001814 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001815 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001816 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001817 '--namespace',
1818 default='default-gzip',
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001819 help='The namespace to use on the Isolate Server, default: %default')
1820
1821
nodir55be77b2016-05-03 09:39:57 -07001822def process_isolate_server_options(
1823 parser, options, set_exception_handler, required):
1824 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001825
1826 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001827 """
1828 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001829 if required:
1830 parser.error('--isolate-server is required.')
1831 return
1832
aludwind7b7b7e2017-06-29 16:38:50 -07001833 if options.grpc_proxy:
1834 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001835 else:
1836 try:
1837 options.isolate_server = net.fix_url(options.isolate_server)
1838 except ValueError as e:
1839 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001840 if set_exception_handler:
1841 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001842 try:
1843 return auth.ensure_logged_in(options.isolate_server)
1844 except ValueError as e:
1845 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001846 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001847
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001848
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001849def add_cache_options(parser):
1850 cache_group = optparse.OptionGroup(parser, 'Cache management')
1851 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001852 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001853 help='Directory to keep a local cache of the files. Accelerates download '
1854 'by reusing already downloaded files. Default=%default')
1855 cache_group.add_option(
1856 '--max-cache-size',
1857 type='int',
1858 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001859 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001860 help='Trim if the cache gets larger than this value, default=%default')
1861 cache_group.add_option(
1862 '--min-free-space',
1863 type='int',
1864 metavar='NNN',
1865 default=2*1024*1024*1024,
1866 help='Trim if disk free space becomes lower than this value, '
1867 'default=%default')
1868 cache_group.add_option(
1869 '--max-items',
1870 type='int',
1871 metavar='NNN',
1872 default=100000,
1873 help='Trim if more than this number of items are in the cache '
Junji Watanabe38b28b02020-04-23 10:23:30 +00001874 'default=%default')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001875 parser.add_option_group(cache_group)
1876
1877
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001878def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001879 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001880 policies = local_caching.CachePolicies(
1881 options.max_cache_size,
1882 options.min_free_space,
1883 options.max_items,
1884 # 3 weeks.
Junji Watanabe38b28b02020-04-23 10:23:30 +00001885 max_age_secs=21 * 24 * 60 * 60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001886
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001887 # |options.cache| path may not exist until DiskContentAddressedCache()
1888 # instance is created.
1889 return local_caching.DiskContentAddressedCache(
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001890 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001891 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001892
1893
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001894class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
Junji Watanabe38b28b02020-04-23 10:23:30 +00001895
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001896 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001897 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001898 self,
1899 version=__version__,
1900 prog=os.path.basename(sys.modules[__name__].__file__),
1901 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001902 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001903
1904 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001905 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001906 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001907 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001908 return options, args
1909
1910
1911def main(args):
1912 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001913 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001914
1915
1916if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001917 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001918 fix_encoding.fix_encoding()
1919 tools.disable_buffering()
1920 colorama.init()
Takuto Ikuta160b4452020-04-15 06:33:55 +00001921 net.set_user_agent('isolateserver.py/' + __version__)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001922 sys.exit(main(sys.argv[1:]))