blob: c691918ba4e9d1521faf6ecb165a3fa5297c866e [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Takuto Ikuta160b4452020-04-15 06:33:55 +000025from utils import net
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
Takuto Ikuta6e2ff962019-10-29 12:35:27 +000033import six
Lei Leife202df2019-06-11 17:33:34 +000034from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000035
36# pylint: disable=ungrouped-imports
37import auth
38import isolated_format
39import isolate_storage
40import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050041from utils import file_path
maruel12e30012015-10-09 11:55:35 -070042from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040043from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000044from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040045from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070046from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000047from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080048
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000049
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000050# Version of isolate protocol passed to the server in /handshake request.
51ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000052
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053
Vadim Shtayura3148e072014-09-02 18:51:52 -070054# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Junji Watanabe38b28b02020-04-23 10:23:30 +000077 '7z',
78 'avi',
79 'cur',
80 'gif',
81 'h264',
82 'jar',
83 'jpeg',
84 'jpg',
85 'mp4',
86 'pdf',
87 'png',
88 'wav',
89 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000090]
91
maruel@chromium.org41601642013-09-18 19:40:46 +000092# The delay (in seconds) to wait between logging statements when retrieving
93# the required files. This is intended to let the user (or buildbot) know that
94# the program is still running.
95DELAY_BETWEEN_UPDATES_IN_SECS = 30
96
97
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050098DEFAULT_BLACKLIST = (
Junji Watanabe38b28b02020-04-23 10:23:30 +000099 # Temporary vim or python files.
100 r'^.+\.(?:pyc|swp)$',
101 # .git or .svn directory.
102 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500103)
104
105
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500106class Error(Exception):
107 """Generic runtime error."""
108 pass
109
110
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400111class Aborted(Error):
112 """Operation aborted."""
113 pass
114
115
nodir90bc8dc2016-06-15 13:35:21 -0700116class AlreadyExists(Error):
117 """File already exists."""
118
119
maruel12e30012015-10-09 11:55:35 -0700120def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800121 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700122 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800123 if offset:
124 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000125 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000126 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000127 if not data:
128 break
129 yield data
130
131
tansell9e04a8d2016-07-28 09:31:59 -0700132def fileobj_path(fileobj):
133 """Return file system path for file like object or None.
134
135 The returned path is guaranteed to exist and can be passed to file system
136 operations like copy.
137 """
138 name = getattr(fileobj, 'name', None)
139 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000140 return None
tansell9e04a8d2016-07-28 09:31:59 -0700141
142 # If the file like object was created using something like open("test.txt")
143 # name will end up being a str (such as a function outside our control, like
144 # the standard library). We want all our paths to be unicode objects, so we
145 # decode it.
Takuto Ikuta95459dd2019-10-29 12:39:47 +0000146 if not isinstance(name, six.text_type):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500147 # We incorrectly assume that UTF-8 is used everywhere.
148 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700149
tansell26de79e2016-11-13 18:41:11 -0800150 # fs.exists requires an absolute path, otherwise it will fail with an
151 # assertion error.
152 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000153 return None
tansell26de79e2016-11-13 18:41:11 -0800154
tansell9e04a8d2016-07-28 09:31:59 -0700155 if fs.exists(name):
156 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000157 return None
tansell9e04a8d2016-07-28 09:31:59 -0700158
159
160# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
161# wrappers have been created.
162def fileobj_copy(
163 dstfileobj, srcfileobj, size=-1,
164 chunk_size=isolated_format.DISK_FILE_CHUNK):
165 """Copy data from srcfileobj to dstfileobj.
166
167 Providing size means exactly that amount of data will be copied (if there
168 isn't enough data, an IOError exception is thrown). Otherwise all data until
169 the EOF marker will be copied.
170 """
171 if size == -1 and hasattr(srcfileobj, 'tell'):
172 if srcfileobj.tell() != 0:
173 raise IOError('partial file but not using size')
174
175 written = 0
176 while written != size:
177 readsize = chunk_size
178 if size > 0:
179 readsize = min(readsize, size-written)
180 data = srcfileobj.read(readsize)
181 if not data:
182 if size == -1:
183 break
184 raise IOError('partial file, got %s, wanted %s' % (written, size))
185 dstfileobj.write(data)
186 written += len(data)
187
188
189def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
190 """Put srcfileobj at the given dstpath with given mode.
191
192 The function aims to do this as efficiently as possible while still allowing
193 any possible file like object be given.
194
195 Creating a tree of hardlinks has a few drawbacks:
196 - tmpfs cannot be used for the scratch space. The tree has to be on the same
197 partition as the cache.
198 - involves a write to the inode, which advances ctime, cause a metadata
199 writeback (causing disk seeking).
200 - cache ctime cannot be used to detect modifications / corruption.
201 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
202 partition. This is why the function automatically fallbacks to copying the
203 file content.
204 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
205 same owner is for all hardlinks.
206 - Anecdotal report that ext2 is known to be potentially faulty on high rate
207 of hardlink creation.
208
209 Creating a tree of symlinks has a few drawbacks:
210 - Tasks running the equivalent of os.path.realpath() will get the naked path
211 and may fail.
212 - Windows:
213 - Symlinks are reparse points:
214 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
215 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
216 - Symbolic links are Win32 paths, not NT paths.
217 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
218 - Symbolic links are supported on Windows 7 and later only.
219 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
220 default.
221 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
222 RID is present in the token;
223 https://msdn.microsoft.com/en-us/library/bb530410.aspx
224 """
225 srcpath = fileobj_path(srcfileobj)
226 if srcpath and size == -1:
227 readonly = file_mode is None or (
228 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
229
230 if readonly:
231 # If the file is read only we can link the file
232 if use_symlink:
233 link_mode = file_path.SYMLINK_WITH_FALLBACK
234 else:
235 link_mode = file_path.HARDLINK_WITH_FALLBACK
236 else:
237 # If not read only, we must copy the file
238 link_mode = file_path.COPY
239
240 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000241 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700242 else:
243 # Need to write out the file
244 with fs.open(dstpath, 'wb') as dstfileobj:
245 fileobj_copy(dstfileobj, srcfileobj, size)
246
Takuto Ikuta523c6472019-09-18 02:53:34 +0000247 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
248 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
249 # early to skip slow/unnecessary chmod call.
250 return
tansell9e04a8d2016-07-28 09:31:59 -0700251
252 # file_mode of 0 is actually valid, so need explicit check.
253 if file_mode is not None:
254 fs.chmod(dstpath, file_mode)
255
256
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000257def zip_compress(content_generator, level=7):
258 """Reads chunks from |content_generator| and yields zip compressed chunks."""
259 compressor = zlib.compressobj(level)
260 for chunk in content_generator:
261 compressed = compressor.compress(chunk)
262 if compressed:
263 yield compressed
264 tail = compressor.flush(zlib.Z_FINISH)
265 if tail:
266 yield tail
267
268
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400269def zip_decompress(
270 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000271 """Reads zipped data from |content_generator| and yields decompressed data.
272
273 Decompresses data in small chunks (no larger than |chunk_size|) so that
274 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
275
276 Raises IOError if data is corrupted or incomplete.
277 """
278 decompressor = zlib.decompressobj()
279 compressed_size = 0
280 try:
281 for chunk in content_generator:
282 compressed_size += len(chunk)
283 data = decompressor.decompress(chunk, chunk_size)
284 if data:
285 yield data
286 while decompressor.unconsumed_tail:
287 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
288 if data:
289 yield data
290 tail = decompressor.flush()
291 if tail:
292 yield tail
293 except zlib.error as e:
294 raise IOError(
295 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
296 # Ensure all data was read and decompressed.
297 if decompressor.unused_data or decompressor.unconsumed_tail:
298 raise IOError('Not all data was decompressed')
299
300
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000301def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000302 """Given a filename calculates the ideal zip compression level to use."""
303 file_ext = os.path.splitext(filename)[1].lower()
304 # TODO(csharp): Profile to find what compression level works best.
305 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
306
307
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000308def create_directories(base_directory, files):
309 """Creates the directory structure needed by the given list of files."""
310 logging.debug('create_directories(%s, %d)', base_directory, len(files))
311 # Creates the tree of directories to create.
312 directories = set(os.path.dirname(f) for f in files)
313 for item in list(directories):
314 while item:
315 directories.add(item)
316 item = os.path.dirname(item)
317 for d in sorted(directories):
318 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700319 abs_d = os.path.join(base_directory, d)
320 if not fs.isdir(abs_d):
321 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000322
323
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000324def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500325 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000326 for filepath, properties in files:
327 if 'l' not in properties:
328 continue
329 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500330 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000331 logging.warning('Ignoring symlink %s', filepath)
332 continue
333 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700334 try:
335 os.symlink(properties['l'], outfile) # pylint: disable=E1101
336 except OSError as e:
337 if e.errno == errno.EEXIST:
338 raise AlreadyExists('File %s already exists.' % outfile)
339 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000340
341
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000342class _ThreadFile(object):
343 """Multithreaded fake file. Used by TarBundle."""
344 def __init__(self):
345 self._data = threading_utils.TaskChannel()
346 self._offset = 0
347
348 def __iter__(self):
349 return self._data
350
351 def tell(self):
352 return self._offset
353
354 def write(self, b):
355 self._data.send_result(b)
356 self._offset += len(b)
357
358 def close(self):
359 self._data.send_done()
360
361
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400362class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800363 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000364
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800365 Its digest and size may be provided in advance, if known. Otherwise they will
366 be derived from the file content.
367 """
368
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000369 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800370 super(FileItem, self).__init__(
371 digest,
maruel12e30012015-10-09 11:55:35 -0700372 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000373 high_priority,
374 compression_level=_get_zip_compression_level(path))
375 self._path = path
376 self._algo = algo
377 self._meta = None
378
379 @property
380 def path(self):
381 return self._path
382
383 @property
Takuto Ikuta57cb09c2020-04-23 04:10:49 +0000384 def algo(self):
385 return self._algo
386
387 @property
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000388 def digest(self):
389 if not self._digest:
390 self._digest = isolated_format.hash_file(self._path, self._algo)
391 return self._digest
392
393 @property
394 def meta(self):
395 if not self._meta:
396 # TODO(maruel): Inline.
Takuto Ikutaa5f12c52020-06-11 06:52:01 +0000397 self._meta = isolated_format.file_to_metadata(self.path, False)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000398 # We need to hash right away.
399 self._meta['h'] = self.digest
400 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000401
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800402 def content(self):
403 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000404
405
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000406class TarBundle(isolate_storage.Item):
407 """Tarfile to push to Storage.
408
409 Its digest is the digest of all the files it contains. It is generated on the
410 fly.
411 """
412
413 def __init__(self, root, algo):
414 # 2 trailing 512 bytes headers.
415 super(TarBundle, self).__init__(size=1024)
416 self._items = []
417 self._meta = None
418 self._algo = algo
419 self._root_len = len(root) + 1
420 # Same value as for Go.
421 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
422 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
423 self._archive_max_size = int(10e6)
424
425 @property
426 def digest(self):
427 if not self._digest:
428 self._prepare()
429 return self._digest
430
431 @property
432 def size(self):
433 if self._size is None:
434 self._prepare()
435 return self._size
436
437 def try_add(self, item):
438 """Try to add this file to the bundle.
439
440 It is extremely naive but this should be just enough for
441 https://crbug.com/825418.
442
443 Future improvements should be in the Go code, and the Swarming bot should be
444 migrated to use the Go code instead.
445 """
446 if not item.size:
447 return False
448 # pylint: disable=unreachable
449 rounded = (item.size + 512) & ~511
450 if rounded + self._size > self._archive_max_size:
451 return False
452 # https://crbug.com/825418
453 return False
454 self._size += rounded
455 self._items.append(item)
456 return True
457
458 def yield_item_path_meta(self):
459 """Returns a tuple(Item, filepath, meta_dict).
460
461 If the bundle contains less than 5 items, the items are yielded.
462 """
463 if len(self._items) < 5:
464 # The tarball is too small, yield individual items, if any.
465 for item in self._items:
466 yield item, item.path[self._root_len:], item.meta
467 else:
468 # This ensures self._meta is set.
469 p = self.digest + '.tar'
470 # Yield itself as a tarball.
471 yield self, p, self._meta
472
473 def content(self):
474 """Generates the tarfile content on the fly."""
475 obj = _ThreadFile()
476 def _tar_thread():
477 try:
478 t = tarfile.open(
479 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
480 for item in self._items:
481 logging.info(' tarring %s', item.path)
482 t.add(item.path)
483 t.close()
484 except Exception:
485 logging.exception('Internal failure')
486 finally:
487 obj.close()
488
489 t = threading.Thread(target=_tar_thread)
490 t.start()
491 try:
492 for data in obj:
493 yield data
494 finally:
495 t.join()
496
497 def _prepare(self):
498 h = self._algo()
499 total = 0
500 for chunk in self.content():
501 h.update(chunk)
502 total += len(chunk)
503 # pylint: disable=attribute-defined-outside-init
504 # This is not true, they are defined in Item.__init__().
505 self._digest = h.hexdigest()
506 self._size = total
507 self._meta = {
508 'h': self.digest,
509 's': self.size,
510 't': u'tar',
511 }
512
513
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400514class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000515 """A byte buffer to push to Storage."""
516
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000517 def __init__(self, buf, algo, high_priority=False):
518 super(BufferItem, self).__init__(
519 digest=algo(buf).hexdigest(),
520 size=len(buf),
521 high_priority=high_priority)
522 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000523
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800524 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000525 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000526
527
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000528class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000530
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531 Implements compression support, parallel 'contains' checks, parallel uploads
532 and more.
533
534 Works only within single namespace (and thus hashing algorithm and compression
535 scheme are fixed).
536
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400537 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
538 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 """
540
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700541 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000542 self._storage_api = storage_api
543 self._cpu_thread_pool = None
544 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400545 self._aborted = False
546 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000547
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000548 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000549 def server_ref(self):
550 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700551
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000552 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700553 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000554 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700555
556 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000557 def cpu_thread_pool(self):
558 """ThreadPool for CPU-bound tasks like zipping."""
559 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500560 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000561 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
562 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500563 # On 32 bits userland, do not try to use more than 16 threads.
564 threads = min(threads, 16)
565 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000566 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000567
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000568 @property
569 def net_thread_pool(self):
570 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
571 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700572 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000573 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000574
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000575 def close(self):
576 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400577 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000578 if self._cpu_thread_pool:
579 self._cpu_thread_pool.join()
580 self._cpu_thread_pool.close()
581 self._cpu_thread_pool = None
582 if self._net_thread_pool:
583 self._net_thread_pool.join()
584 self._net_thread_pool.close()
585 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400586 logging.info('Done.')
587
588 def abort(self):
589 """Cancels any pending or future operations."""
590 # This is not strictly theadsafe, but in the worst case the logging message
591 # will be printed twice. Not a big deal. In other places it is assumed that
592 # unprotected reads and writes to _aborted are serializable (it is true
593 # for python) and thus no locking is used.
594 if not self._aborted:
595 logging.warning('Aborting... It can take a while.')
596 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000597
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000598 def __enter__(self):
599 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400600 assert not self._prev_sig_handlers, self._prev_sig_handlers
601 for s in (signal.SIGINT, signal.SIGTERM):
602 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000603 return self
604
605 def __exit__(self, _exc_type, _exc_value, _traceback):
606 """Context manager interface."""
607 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400608 while self._prev_sig_handlers:
609 s, h = self._prev_sig_handlers.popitem()
610 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000611 return False
612
Takuto Ikuta26980872020-04-09 06:56:37 +0000613 def upload_items(self, items, verify_push=False):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000614 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000615
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800616 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000617
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000618 It uses 3 threads internally:
619 - One to create batches based on a timeout
620 - One to dispatch the /contains RPC and field the missing entries
621 - One to field the /push RPC
622
623 The main threads enumerates 'items' and pushes to the first thread. Then it
624 join() all the threads, waiting for them to complete.
625
626 (enumerate items of Item, this can be slow as disk is traversed)
627 |
628 v
629 _create_items_batches_thread Thread #1
630 (generates list(Item), every 3s or 20~100 items)
631 |
632 v
633 _do_lookups_thread Thread #2
634 | |
635 v v
636 (missing) (was on server)
637 |
638 v
639 _handle_missing_thread Thread #3
640 |
641 v
642 (upload Item, append to uploaded)
643
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000644 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400645 items: list of isolate_storage.Item instances that represents data to
646 upload.
Takuto Ikuta26980872020-04-09 06:56:37 +0000647 verify_push: verify files are uploaded correctly by fetching from server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000648
649 Returns:
650 List of items that were uploaded. All other items are already there.
Ye Kuang4e994292020-04-10 07:07:35 +0000651
652 Raises:
653 The first exception being raised in the worker threads.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000654 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000655 incoming = Queue.Queue()
656 batches_to_lookup = Queue.Queue()
657 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000658 uploaded = []
Ye Kuang4e994292020-04-10 07:07:35 +0000659 exc_channel = threading_utils.TaskChannel()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800660
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000661 def _create_items_batches_thread():
662 """Creates batches for /contains RPC lookup from individual items.
663
664 Input: incoming
665 Output: batches_to_lookup
666 """
667 try:
668 batch_size_index = 0
669 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
670 batch = []
671 while not self._aborted:
672 try:
673 item = incoming.get(True, timeout=3)
674 if item:
675 batch.append(item)
676 except Queue.Empty:
677 item = False
678 if len(batch) == batch_size or (not item and batch):
679 if len(batch) == batch_size:
680 batch_size_index += 1
681 batch_size = ITEMS_PER_CONTAINS_QUERIES[
682 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
683 batches_to_lookup.put(batch)
684 batch = []
685 if item is None:
686 break
Ye Kuang4e994292020-04-10 07:07:35 +0000687 except Exception:
688 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000689 finally:
690 # Unblock the next pipeline.
691 batches_to_lookup.put(None)
692
693 def _do_lookups_thread():
694 """Enqueues all the /contains RPCs and emits the missing items.
695
696 Input: batches_to_lookup
697 Output: missing, to_upload
698 """
699 try:
700 channel = threading_utils.TaskChannel()
701 def _contains(b):
702 if self._aborted:
703 raise Aborted()
704 return self._storage_api.contains(b)
705
706 pending_contains = 0
707 while not self._aborted:
708 batch = batches_to_lookup.get()
709 if batch is None:
710 break
711 self.net_thread_pool.add_task_with_channel(
712 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
713 pending_contains += 1
714 while pending_contains and not self._aborted:
715 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000716 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000717 except threading_utils.TaskChannel.Timeout:
718 break
719 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000720 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000721 missing.put((missing_item, push_state))
722 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000723 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000724 missing.put((missing_item, push_state))
725 pending_contains -= 1
Ye Kuang4e994292020-04-10 07:07:35 +0000726 except Exception:
727 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000728 finally:
729 # Unblock the next pipeline.
730 missing.put((None, None))
731
732 def _handle_missing_thread():
733 """Sends the missing items to the uploader.
734
735 Input: missing
736 Output: uploaded
737 """
Ye Kuang4e994292020-04-10 07:07:35 +0000738 try:
739 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
740 channel = threading_utils.TaskChannel()
741 pending_upload = 0
742 while not self._aborted:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000743 try:
Ye Kuang4e994292020-04-10 07:07:35 +0000744 missing_item, push_state = missing.get(True, timeout=5)
745 if missing_item is None:
746 break
747 self._async_push(channel, missing_item, push_state, verify_push)
748 pending_upload += 1
749 except Queue.Empty:
750 pass
751 detector.ping()
752 while not self._aborted and pending_upload:
753 try:
754 item = channel.next(timeout=0)
755 except threading_utils.TaskChannel.Timeout:
756 break
757 uploaded.append(item)
758 pending_upload -= 1
759 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
760 pending_upload, item.digest, item.size)
761 while not self._aborted and pending_upload:
762 item = channel.next()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000763 uploaded.append(item)
764 pending_upload -= 1
765 logging.debug(
766 'Uploaded %d; %d pending: %s (%d)',
767 len(uploaded), pending_upload, item.digest, item.size)
Ye Kuang4e994292020-04-10 07:07:35 +0000768 except Exception:
769 exc_channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000770
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000771 threads = [
772 threading.Thread(target=_create_items_batches_thread),
773 threading.Thread(target=_do_lookups_thread),
774 threading.Thread(target=_handle_missing_thread),
775 ]
776 for t in threads:
777 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000778
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000779 try:
780 # For each digest keep only first isolate_storage.Item that matches it.
781 # All other items are just indistinguishable copies from the point of view
782 # of isolate server (it doesn't care about paths at all, only content and
783 # digests).
784 seen = {}
785 try:
786 # TODO(maruel): Reorder the items as a priority queue, with larger items
787 # being processed first. This is, before hashing the data.
788 # This must be done in the primary thread since items can be a
789 # generator.
790 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000791 if seen.setdefault(item.digest, item) is item:
792 incoming.put(item)
793 finally:
794 incoming.put(None)
795 finally:
796 for t in threads:
797 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +0000798 exc_channel.send_done()
799 for _ in exc_channel:
800 # If there is no exception, this loop does nothing. Otherwise, it raises
801 # the first exception put onto |exc_channel|.
802 pass
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000803
804 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000805 if seen:
806 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000807 return uploaded
808
Takuto Ikuta26980872020-04-09 06:56:37 +0000809 def _async_push(self, channel, item, push_state, verify_push=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000810 """Starts asynchronous push to the server in a parallel thread.
811
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000812 Can be used only after |item| was checked for presence on a server with a
813 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800814
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000816 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400817 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000818 push_state: push state returned by storage_api.contains(). It contains
819 storage specific information describing how to upload the item (for
820 example in case of cloud storage, it is signed upload URLs).
Takuto Ikuta26980872020-04-09 06:56:37 +0000821 verify_push: verify files are uploaded correctly by fetching from server.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800822
823 Returns:
824 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000825 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800826 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400827 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700828 threading_utils.PRIORITY_HIGH if item.high_priority
829 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800830
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000831 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400832 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400833 if self._aborted:
834 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800835 self._storage_api.push(item, push_state, content)
Takuto Ikuta26980872020-04-09 06:56:37 +0000836 if verify_push:
Takuto Ikutac01532c2020-04-21 07:56:54 +0000837 try:
838 self._fetch(
839 item.digest,
840 item.size,
841 # this consumes all elements from given generator.
842 lambda gen: collections.deque(gen, maxlen=0))
843 except Exception:
844 # reset push_state if failed to verify.
845 push_state.finalized = False
846 push_state.uploaded = False
847 raise
848
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000849 return item
850
Wei Huang1a38fbe2017-11-28 22:55:22 -0500851 # If zipping is not required, just start a push task. Don't pass 'content'
852 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000853 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000854 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000855 return
856
857 # If zipping is enabled, zip in a separate thread.
858 def zip_and_push():
859 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
860 # content right here. It will block until all file is zipped.
861 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400862 if self._aborted:
863 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800864 stream = zip_compress(item.content(), item.compression_level)
Lei Lei73a5f732020-03-23 20:36:14 +0000865 # In Python3, zlib.compress returns a byte object instead of str.
866 data = six.b('').join(stream)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000867 except Exception as exc:
868 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800869 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000870 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500871 # Pass '[data]' explicitly because the compressed data is not same as the
872 # one provided by 'item'. Since '[data]' is a list, it can safely be
873 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000874 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000875 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000876 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000877
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800878 def push(self, item, push_state):
879 """Synchronously pushes a single item to the server.
880
881 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000882 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800883
884 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400885 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000886 push_state: push state returned by storage_api.contains(). It contains
887 storage specific information describing how to upload the item (for
888 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800889
890 Returns:
891 Pushed item (same object as |item|).
892 """
893 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700894 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000895 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000896 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800897 assert pushed is item
898 return item
899
Takuto Ikuta26980872020-04-09 06:56:37 +0000900 def _fetch(self, digest, size, sink):
901 try:
902 # Prepare reading pipeline.
903 stream = self._storage_api.fetch(digest, size, 0)
904 if self.server_ref.is_with_compression:
905 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
906 # Run |stream| through verifier that will assert its size.
907 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
908 size)
909 # Verified stream goes to |sink|.
910 sink(verifier.run())
Takuto Ikuta98dcc372020-04-20 09:09:28 +0000911 except Exception:
912 logging.exception('Failed to fetch %s', digest)
Takuto Ikuta26980872020-04-09 06:56:37 +0000913 raise
914
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000915 def async_fetch(self, channel, priority, digest, size, sink):
916 """Starts asynchronous fetch from the server in a parallel thread.
917
918 Arguments:
919 channel: TaskChannel that receives back |digest| when download ends.
920 priority: thread pool task priority for the fetch.
921 digest: hex digest of an item to download.
922 size: expected size of the item (after decompression).
923 sink: function that will be called as sink(generator).
924 """
925 def fetch():
Takuto Ikuta26980872020-04-09 06:56:37 +0000926 self._fetch(digest, size, sink)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000927 return digest
928
929 # Don't bother with zip_thread_pool for decompression. Decompression is
930 # really fast and most probably IO bound anyway.
931 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
932
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000933
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000934class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400935 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000936
937 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400938 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
939 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940 """
941
942 def __init__(self, storage, cache):
943 self.storage = storage
944 self.cache = cache
945 self._channel = threading_utils.TaskChannel()
946 self._pending = set()
947 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000948 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400949 # Pending digests that the caller waits for, see wait_on()/wait().
950 self._waiting_on = set()
951 # Already fetched digests the caller waits for which are not yet returned by
952 # wait().
953 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000954
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400955 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700956 self,
957 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400958 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700959 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000960 """Starts asynchronous fetch of item |digest|."""
961 # Fetching it now?
962 if digest in self._pending:
963 return
964
965 # Mark this file as in use, verify_all_cached will later ensure it is still
966 # in cache.
967 self._accessed.add(digest)
968
969 # Already fetched? Notify cache to update item's LRU position.
970 if digest in self._fetched:
971 # 'touch' returns True if item is in cache and not corrupted.
972 if self.cache.touch(digest, size):
973 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000974 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000975 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000976
977 # TODO(maruel): It should look at the free disk space, the current cache
978 # size and the size of the new item on every new item:
979 # - Trim the cache as more entries are listed when free disk space is low,
980 # otherwise if the amount of data downloaded during the run > free disk
981 # space, it'll crash.
982 # - Make sure there's enough free disk space to fit all dependencies of
983 # this run! If not, abort early.
984
985 # Start fetching.
986 self._pending.add(digest)
987 self.storage.async_fetch(
988 self._channel, priority, digest, size,
989 functools.partial(self.cache.write, digest))
990
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400991 def wait_on(self, digest):
992 """Updates digests to be waited on by 'wait'."""
993 # Calculate once the already fetched items. These will be retrieved first.
994 if digest in self._fetched:
995 self._waiting_on_ready.add(digest)
996 else:
997 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000998
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400999 def wait(self):
1000 """Waits until any of waited-on items is retrieved.
1001
1002 Once this happens, it is remove from the waited-on set and returned.
1003
1004 This function is called in two waves. The first wave it is done for HIGH
1005 priority items, the isolated files themselves. The second wave it is called
1006 for all the files.
1007
1008 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001009 """
1010 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001011 if self._waiting_on_ready:
1012 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001013
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001014 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001016 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001017 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001018 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001019 self._pending.remove(digest)
1020 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001021 if digest in self._waiting_on:
1022 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001023 return digest
1024
1025 # Should never reach this point due to assert above.
1026 raise RuntimeError('Impossible state')
1027
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001028 @property
1029 def wait_queue_empty(self):
1030 """Returns True if there is no digest left for wait() to return."""
1031 return not self._waiting_on and not self._waiting_on_ready
1032
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001033 def inject_local_file(self, path, algo):
1034 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -07001035 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001036 data = f.read()
1037 digest = algo(data).hexdigest()
1038 self.cache.write(digest, [data])
1039 self._fetched.add(digest)
1040 return digest
1041
1042 @property
1043 def pending_count(self):
1044 """Returns number of items to be fetched."""
1045 return len(self._pending)
1046
1047 def verify_all_cached(self):
1048 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001049 # Not thread safe, but called after all work is done.
1050 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001051
1052
1053class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001054 """Verifies that fetched file is valid before passing it to the
1055 ContentAddressedCache.
1056 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001057
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001058 def __init__(self, stream, hasher, expected_digest, expected_size):
1059 """Initializes the verifier.
1060
1061 Arguments:
1062 * stream: an iterable yielding chunks of content
1063 * hasher: an object from hashlib that supports update() and hexdigest()
1064 (eg, hashlib.sha1).
1065 * expected_digest: if the entire stream is piped through hasher and then
1066 summarized via hexdigest(), this should be the result. That is, it
1067 should be a hex string like 'abc123'.
1068 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001069 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001070 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001071 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001072 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001073 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001074 self.expected_size = expected_size
1075 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001076 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001077
1078 def run(self):
1079 """Generator that yields same items as |stream|.
1080
1081 Verifies |stream| is complete before yielding a last chunk to consumer.
1082
1083 Also wraps IOError produced by consumer into MappingError exceptions since
1084 otherwise Storage will retry fetch on unrelated local cache errors.
1085 """
1086 # Read one chunk ahead, keep it in |stored|.
1087 # That way a complete stream can be verified before pushing last chunk
1088 # to consumer.
1089 stored = None
1090 for chunk in self.stream:
1091 assert chunk is not None
1092 if stored is not None:
1093 self._inspect_chunk(stored, is_last=False)
1094 try:
1095 yield stored
1096 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001097 raise isolated_format.MappingError(
1098 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001099 stored = chunk
1100 if stored is not None:
1101 self._inspect_chunk(stored, is_last=True)
1102 try:
1103 yield stored
1104 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001105 raise isolated_format.MappingError(
1106 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001107
1108 def _inspect_chunk(self, chunk, is_last):
1109 """Called for each fetched chunk before passing it to consumer."""
1110 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001111 self.rolling_hash.update(chunk)
1112 if not is_last:
1113 return
1114
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001115 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001116 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001117 msg = 'Incorrect file size: want %d, got %d' % (
1118 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001119 raise IOError(msg)
1120
1121 actual_digest = self.rolling_hash.hexdigest()
1122 if self.expected_digest != actual_digest:
1123 msg = 'Incorrect digest: want %s, got %s' % (
1124 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001125 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001126
1127
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001128class IsolatedBundle(object):
1129 """Fetched and parsed .isolated file with all dependencies."""
1130
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001131 def __init__(self, filter_cb):
1132 """
1133 filter_cb: callback function to filter downloaded content.
1134 When filter_cb is not None, Isolated file is downloaded iff
1135 filter_cb(filepath) returns True.
1136 """
1137
Vadim Shtayura3148e072014-09-02 18:51:52 -07001138 self.command = []
1139 self.files = {}
Vadim Shtayura3148e072014-09-02 18:51:52 -07001140 self.relative_cwd = None
1141 # The main .isolated file, a IsolatedFile instance.
1142 self.root = None
1143
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001144 self._filter_cb = filter_cb
1145
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001146 def fetch(self, fetch_queue, root_isolated_hash, algo):
1147 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001148
1149 It enables support for "included" .isolated files. They are processed in
1150 strict order but fetched asynchronously from the cache. This is important so
1151 that a file in an included .isolated file that is overridden by an embedding
1152 .isolated file is not fetched needlessly. The includes are fetched in one
1153 pass and the files are fetched as soon as all the ones on the left-side
1154 of the tree were fetched.
1155
1156 The prioritization is very important here for nested .isolated files.
1157 'includes' have the highest priority and the algorithm is optimized for both
1158 deep and wide trees. A deep one is a long link of .isolated files referenced
1159 one at a time by one item in 'includes'. A wide one has a large number of
1160 'includes' in a single .isolated file. 'left' is defined as an included
1161 .isolated file earlier in the 'includes' list. So the order of the elements
1162 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001163
1164 As a side effect this method starts asynchronous fetch of all data files
1165 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1166 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001167 """
1168 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1169
1170 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1171 pending = {}
1172 # Set of hashes of already retrieved items to refuse recursive includes.
1173 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001174 # Set of IsolatedFile's whose data files have already being fetched.
1175 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001176
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001177 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001178 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001179 h = isolated_file.obj_hash
1180 if h in seen:
1181 raise isolated_format.IsolatedError(
1182 'IsolatedFile %s is retrieved recursively' % h)
1183 assert h not in pending
1184 seen.add(h)
1185 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001186 # This isolated item is being added dynamically, notify FetchQueue.
1187 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001188 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1189
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001190 # Start fetching root *.isolated file (single file, not the whole bundle).
1191 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001192
1193 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001194 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001195 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001196 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001197 with fetch_queue.cache.getfileobj(item_hash) as f:
1198 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001199
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001200 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001201 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001202 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001203
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001204 # Always fetch *.isolated files in traversal order, waiting if necessary
1205 # until next to-be-processed node loads. "Waiting" is done by yielding
1206 # back to the outer loop, that waits until some *.isolated is loaded.
1207 for node in isolated_format.walk_includes(self.root):
1208 if node not in processed:
1209 # Not visited, and not yet loaded -> wait for it to load.
1210 if not node.is_loaded:
1211 break
1212 # Not visited and loaded -> process it and continue the traversal.
1213 self._start_fetching_files(node, fetch_queue)
1214 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001215
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001216 # All *.isolated files should be processed by now and only them.
1217 all_isolateds = set(isolated_format.walk_includes(self.root))
1218 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001219 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001220
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001221 # Extract 'command' and other bundle properties.
1222 for node in isolated_format.walk_includes(self.root):
1223 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001224 self.relative_cwd = self.relative_cwd or ''
1225
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001226 def _start_fetching_files(self, isolated, fetch_queue):
1227 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001228
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001229 Modifies self.files.
1230 """
maruel10bea7b2016-12-07 05:03:49 -08001231 files = isolated.data.get('files', {})
1232 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001233 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001234 if self._filter_cb and not self._filter_cb(filepath):
1235 continue
1236
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001237 # Root isolated has priority on the files being mapped. In particular,
1238 # overridden files must not be fetched.
1239 if filepath not in self.files:
1240 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001241
tansell9e04a8d2016-07-28 09:31:59 -07001242 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001243 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001244 fetch_queue.add(
1245 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1246
1247 def _update_self(self, node):
1248 """Extracts bundle global parameters from loaded *.isolated file.
1249
1250 Will be called with each loaded *.isolated file in order of traversal of
1251 isolated include graph (see isolated_format.walk_includes).
1252 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001253 # Grabs properties.
1254 if not self.command and node.data.get('command'):
1255 # Ensure paths are correctly separated on windows.
1256 self.command = node.data['command']
1257 if self.command:
1258 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001259 if (self.relative_cwd is None and
1260 node.data.get('relative_cwd') is not None):
1261 self.relative_cwd = node.data['relative_cwd']
1262
1263
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001264def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001265 """Returns Storage class that can upload and download from |namespace|.
1266
1267 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001268 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001269
1270 Returns:
1271 Instance of Storage.
1272 """
Lei Lei73a5f732020-03-23 20:36:14 +00001273 # Handle the specific internal use case.
1274 assert (isinstance(server_ref, isolate_storage.ServerRef) or
1275 type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001276 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001277
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001278
Takuto Ikutaa5f12c52020-06-11 06:52:01 +00001279def _map_file(dst, digest, props, cache, use_symlinks):
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001280 """Put downloaded file to destination path. This function is used for multi
1281 threaded file putting.
1282 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001283 with tools.Profiler("_map_file for %s" % dst):
1284 with cache.getfileobj(digest) as srcfileobj:
1285 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001286
Takuto Ikuta523c6472019-09-18 02:53:34 +00001287 if filetype == 'basic':
1288 # Ignore all bits apart from the user.
1289 file_mode = (props.get('m') or 0o500) & 0o700
Takuto Ikuta523c6472019-09-18 02:53:34 +00001290 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001291
Takuto Ikuta523c6472019-09-18 02:53:34 +00001292 elif filetype == 'tar':
1293 basedir = os.path.dirname(dst)
1294 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1295 ensured_dirs = set()
1296 for ti in t:
1297 if not ti.isfile():
1298 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1299 ti.type)
1300 continue
1301 # Handle files created on Windows fetched on POSIX and the
1302 # reverse.
1303 other_sep = '/' if os.path.sep == '\\' else '\\'
1304 name = ti.name.replace(other_sep, os.path.sep)
1305 fp = os.path.normpath(os.path.join(basedir, name))
1306 if not fp.startswith(basedir):
1307 logging.error('Path(%r) is outside root directory', fp)
1308 ifd = t.extractfile(ti)
1309 fp_dir = os.path.dirname(fp)
1310 if fp_dir not in ensured_dirs:
1311 file_path.ensure_tree(fp_dir)
1312 ensured_dirs.add(fp_dir)
1313 file_mode = ti.mode & 0o700
Takuto Ikuta523c6472019-09-18 02:53:34 +00001314 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001315
Takuto Ikuta523c6472019-09-18 02:53:34 +00001316 else:
1317 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001318
1319
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001320def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1321 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001322 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001323
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001324 Arguments:
1325 isolated_hash: hash of the root *.isolated file.
1326 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001327 cache: ContentAddressedCache class that knows how to store and map files
1328 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001329 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001330 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001331 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001332
1333 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001334 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001335 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001336 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001337 'fetch_isolated(%s, %s, %s, %s, %s)',
1338 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001339 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001340 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001341 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001342 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001343
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001344 with tools.Profiler('GetIsolateds'):
1345 # Optionally support local files by manually adding them to cache.
1346 if not isolated_format.is_valid_hash(isolated_hash, algo):
Junji Watanabe38b28b02020-04-23 10:23:30 +00001347 logging.debug(
1348 '%s is not a valid hash, assuming a file '
1349 '(algo was %s, hash size was %d)', isolated_hash, algo(),
1350 algo().digest_size)
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001351 path = six.text_type(os.path.abspath(isolated_hash))
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001352 try:
1353 isolated_hash = fetch_queue.inject_local_file(path, algo)
1354 except IOError as e:
1355 raise isolated_format.MappingError(
1356 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1357 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001358
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001359 # Load all *.isolated and start loading rest of the files.
1360 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001361
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001362 with tools.Profiler('GetRest'):
1363 # Create file system hierarchy.
1364 file_path.ensure_tree(outdir)
1365 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001366 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001367
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001368 # Ensure working directory exists.
1369 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1370 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001371
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001372 # Multimap: digest -> list of pairs (path, props).
1373 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001374 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001375 if 'h' in props:
1376 remaining.setdefault(props['h'], []).append((filepath, props))
1377 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001378
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001379 # Now block on the remaining files to be downloaded and mapped.
1380 logging.info('Retrieving remaining files (%d of them)...',
1381 fetch_queue.pending_count)
1382 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001383
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001384 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1385 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1386 while remaining:
1387 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001388
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001389 # Wait for any item to finish fetching to cache.
1390 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001391
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001392 # Create the files in the destination using item in cache as the
1393 # source.
1394 for filepath, props in remaining.pop(digest):
1395 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001396
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001397 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
Takuto Ikutaa5f12c52020-06-11 06:52:01 +00001398 _map_file, fullpath, digest, props,
1399 cache, use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001400
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001401 # Report progress.
1402 duration = time.time() - last_update
1403 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1404 msg = '%d files remaining...' % len(remaining)
1405 sys.stdout.write(msg + '\n')
1406 sys.stdout.flush()
1407 logging.info(msg)
1408 last_update = time.time()
1409 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1410 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001411
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001412 # Save the cache right away to not loose the state of the new objects.
1413 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001414 # Cache could evict some items we just tried to fetch, it's a fatal error.
1415 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001416 free_disk = file_path.get_free_space(cache.cache_dir)
1417 msg = (
1418 'Cache is too small to hold all requested files.\n'
1419 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001420 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001421 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001422 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001423
1424
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001425def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001426 """Yields every file and/or symlink found.
1427
1428 Yields:
1429 tuple(FileItem, relpath, metadata)
1430 For a symlink, FileItem is None.
1431 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001432 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001433 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001434 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001435 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001436 root,
1437 u'.' + os.path.sep,
1438 blacklist,
1439 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001440
1441 filepath = os.path.join(root, relpath)
1442 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001443 # TODO(maruel): Do not call this.
Takuto Ikutaa5f12c52020-06-11 06:52:01 +00001444 meta = isolated_format.file_to_metadata(filepath, False)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001445 yield None, relpath, meta
1446 continue
1447
1448 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001449 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1450 # The file was added to the current pending tarball and won't be archived
1451 # individually.
1452 continue
1453
1454 # Flush and reset the bundle.
1455 for i, p, m in bundle.yield_item_path_meta():
1456 yield i, p, m
1457 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001458
1459 # Yield the file individually.
1460 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1461 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001462
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001463 for i, p, m in bundle.yield_item_path_meta():
1464 yield i, p, m
1465
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001466
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001467def _print_upload_stats(items, missing):
1468 """Prints upload stats."""
1469 total = len(items)
1470 total_size = sum(f.size for f in items)
1471 logging.info(
1472 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1473 cache_hit = set(items).difference(missing)
1474 cache_hit_size = sum(f.size for f in cache_hit)
1475 logging.info(
1476 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1477 len(cache_hit),
1478 cache_hit_size / 1024.,
1479 len(cache_hit) * 100. / total,
1480 cache_hit_size * 100. / total_size if total_size else 0)
1481 cache_miss = missing
1482 cache_miss_size = sum(f.size for f in cache_miss)
Junji Watanabe38b28b02020-04-23 10:23:30 +00001483 logging.info('cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1484 len(cache_miss), cache_miss_size / 1024.,
1485 len(cache_miss) * 100. / total,
1486 cache_miss_size * 100. / total_size if total_size else 0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001487
1488
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001489def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001490 """Called by archive_files_to_storage for a directory.
1491
1492 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001493
1494 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001495 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001496 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001497 files = {}
1498 for item, relpath, meta in _directory_to_metadata(
1499 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001500 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001501 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001502 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001503 yield item
1504
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001505 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001506 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001507 'algo': hash_algo_name,
1508 'files': files,
1509 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001510 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001511 # Keep the file in memory. This is fine because .isolated files are relatively
1512 # small.
1513 yield BufferItem(
1514 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001515
1516
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001517def _archive_files_to_storage_internal(storage,
1518 files,
1519 blacklist,
1520 verify_push=False):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001521 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001522
1523 Arguments:
1524 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001525 files: iterable of files to upload. If a directory is specified (with a
1526 trailing slash), a .isolated file is created and its hash is returned.
1527 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001528 blacklist: function that returns True if a file should be omitted.
Takuto Ikuta26980872020-04-09 06:56:37 +00001529 verify_push: verify files are uploaded correctly by fetching from server.
maruel064c0a32016-04-05 11:47:15 -07001530
1531 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001532 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001533 The first file in the first item is always the .isolated file.
Ye Kuang4e994292020-04-10 07:07:35 +00001534
1535 Raises:
1536 Re-raises the exception in upload_items(), if there is any.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001537 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001538 # Dict of path to hash.
1539 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001540 hash_algo = storage.server_ref.hash_algo
1541 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001542 # Generator of FileItem to pass to upload_items() concurrent operation.
1543 channel = threading_utils.TaskChannel()
Ye Kuang4e994292020-04-10 07:07:35 +00001544 exc_channel = threading_utils.TaskChannel()
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001545 uploaded_digests = set()
Ye Kuang4e994292020-04-10 07:07:35 +00001546
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001547 def _upload_items():
Ye Kuang4e994292020-04-10 07:07:35 +00001548 try:
1549 results = storage.upload_items(channel, verify_push)
1550 uploaded_digests.update(f.digest for f in results)
1551 except Exception:
1552 exc_channel.send_exception()
1553
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001554 t = threading.Thread(target=_upload_items)
1555 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001556
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001557 # Keep track locally of the items to determine cold and hot items.
1558 items_found = []
1559 try:
1560 for f in files:
Takuto Ikuta95459dd2019-10-29 12:39:47 +00001561 assert isinstance(f, six.text_type), repr(f)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001562 if f in results:
1563 # Duplicate
1564 continue
1565 try:
1566 filepath = os.path.abspath(f)
1567 if fs.isdir(filepath):
1568 # Uploading a whole directory.
1569 item = None
1570 for item in _enqueue_dir(
1571 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001572 channel.send_result(item)
1573 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001574 # The very last item will be the .isolated file.
1575 if not item:
1576 # There was no file in the directory.
1577 continue
1578 elif fs.isfile(filepath):
1579 item = FileItem(
1580 path=filepath,
1581 algo=hash_algo,
1582 size=None,
1583 high_priority=f.endswith('.isolated'))
1584 channel.send_result(item)
1585 items_found.append(item)
1586 else:
1587 raise Error('%s is neither a file or directory.' % f)
1588 results[f] = item.digest
1589 except OSError:
1590 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001591 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001592 # Stops the generator, so _upload_items() can exit.
1593 channel.send_done()
1594 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +00001595 exc_channel.send_done()
Takuto Ikuta640fcbf2020-04-23 00:12:46 +00001596
1597 try:
1598 for _ in exc_channel:
1599 pass
1600 except Exception:
1601 # log items when failed to upload files.
1602 for item in items_found:
1603 if isinstance(item, FileItem):
1604 logging.error('FileItem path: %s, digest:%s, re-calculated digest:%s',
1605 item.path, item.digest,
1606 isolated_format.hash_file(item.path, item.algo))
1607 continue
1608
1609 logging.error('Item digest:%s', item.digest)
1610
1611 raise
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001612
1613 cold = []
1614 hot = []
1615 for i in items_found:
1616 # Note that multiple FileItem may have the same .digest.
1617 if i.digest in uploaded_digests:
1618 cold.append(i)
1619 else:
1620 hot.append(i)
1621 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001622
1623
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001624# TODO(crbug.com/1073832):
1625# remove this if process leak in coverage build was fixed.
1626def archive_files_to_storage(storage, files, blacklist, verify_push=False):
1627 """Calls _archive_files_to_storage_internal with retry.
1628
1629 Arguments:
1630 See Arguments section in _archive_files_to_storage_internal
1631
1632 Returns:
1633 See Returns section in _archive_files_to_storage_internal
1634
1635 Raises:
1636 Re-raises the exception in _archive_files_to_storage_internal if all retry
1637 failed.
1638 """
1639
1640 # Will do exponential backoff.
1641 # e.g. 10, 20, 40, 80
1642 backoff = 10
1643
1644 while True:
1645 try:
1646 return _archive_files_to_storage_internal(storage, files, blacklist,
1647 verify_push)
1648 except Exception:
1649 if backoff > 100:
1650 raise
1651
Takuto Ikutaf98be312020-05-08 10:22:54 +00001652 on_error.report('error before %d second backoff' % backoff)
1653
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001654 logging.exception(
1655 'failed to run _archive_files_to_storage_internal,'
1656 ' will retry after %d seconds', backoff)
1657 time.sleep(backoff)
1658 backoff *= 2
1659
1660
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001661@subcommand.usage('<file1..fileN> or - to read from stdin')
1662def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001663 """Archives data to the server.
1664
1665 If a directory is specified, a .isolated file is created the whole directory
1666 is uploaded. Then this .isolated file can be included in another one to run
1667 commands.
1668
1669 The commands output each file that was processed with its content hash. For
1670 directories, the .isolated generated for the directory is listed as the
1671 directory entry itself.
1672 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001673 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001674 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001675 options, files = parser.parse_args(args)
Takuto Ikutaae767b32020-05-11 01:22:19 +00001676 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001677 server_ref = isolate_storage.ServerRef(
1678 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001679 if files == ['-']:
1680 files = (l.rstrip('\n\r') for l in sys.stdin)
1681 if not files:
1682 parser.error('Nothing to upload')
1683 files = (f.decode('utf-8') for f in files)
1684 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001685 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001686 with get_storage(server_ref) as storage:
1687 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001688 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001689 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001690 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001691 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001692
1693
1694def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001695 """Download data from the server.
1696
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001697 It can either download individual files or a complete tree from a .isolated
1698 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001699 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001700 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001701 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001702 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001703 help='hash of an isolated file, .isolated file content is discarded, use '
1704 '--file if you need it')
1705 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001706 '-f',
1707 '--file',
1708 metavar='HASH DEST',
1709 default=[],
1710 action='append',
1711 nargs=2,
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001712 help='hash and destination of a file, can be used multiple times')
1713 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001714 '-t',
1715 '--target',
1716 metavar='DIR',
1717 default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001718 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001719 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001720 '--use-symlinks',
1721 action='store_true',
maruel4409e302016-07-19 14:25:51 -07001722 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001723 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001724 options, args = parser.parse_args(args)
1725 if args:
1726 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001727 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001728 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001729
Takuto Ikutaae767b32020-05-11 01:22:19 +00001730 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001731 if bool(options.isolated) == bool(options.file):
1732 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001733 if not options.cache and options.use_symlinks:
1734 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001735
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001736 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001737 cache.cleanup()
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001738 options.target = six.text_type(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001739 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001740 if (fs.isfile(options.target) or
1741 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001742 parser.error(
1743 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001744 server_ref = isolate_storage.ServerRef(
1745 options.isolate_server, options.namespace)
1746 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001747 # Fetching individual files.
1748 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001749 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001750 channel = threading_utils.TaskChannel()
1751 pending = {}
1752 for digest, dest in options.file:
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001753 dest = six.text_type(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001754 pending[digest] = dest
1755 storage.async_fetch(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001756 channel, threading_utils.PRIORITY_MED, digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001757 local_caching.UNKNOWN_FILE_SIZE,
Junji Watanabe38b28b02020-04-23 10:23:30 +00001758 functools.partial(local_caching.file_write,
1759 os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001760 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001761 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001762 dest = pending.pop(fetched)
1763 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001764
Vadim Shtayura3172be52013-12-03 12:49:05 -08001765 # Fetching whole isolated tree.
1766 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001767 bundle = fetch_isolated(
1768 isolated_hash=options.isolated,
1769 storage=storage,
1770 cache=cache,
1771 outdir=options.target,
1772 use_symlinks=options.use_symlinks)
1773 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001774 if bundle.command:
1775 rel = os.path.join(options.target, bundle.relative_cwd)
1776 print('To run this test please run from the directory %s:' %
1777 os.path.join(options.target, rel))
1778 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001779
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001780 return 0
1781
1782
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001783def add_archive_options(parser):
1784 parser.add_option(
1785 '--blacklist',
1786 action='append', default=list(DEFAULT_BLACKLIST),
1787 help='List of regexp to use as blacklist filter when uploading '
1788 'directories')
1789
1790
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001791def add_isolate_server_options(parser):
1792 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001793 parser.add_option(
1794 '-I', '--isolate-server',
1795 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001796 help='URL of the Isolate Server to use. Defaults to the environment '
1797 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1798 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001799 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001800 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001801 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001802 '--namespace',
1803 default='default-gzip',
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001804 help='The namespace to use on the Isolate Server, default: %default')
1805
1806
Takuto Ikutaae767b32020-05-11 01:22:19 +00001807def process_isolate_server_options(parser, options, required):
nodir55be77b2016-05-03 09:39:57 -07001808 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001809
1810 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001811 """
1812 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001813 if required:
1814 parser.error('--isolate-server is required.')
1815 return
1816
aludwind7b7b7e2017-06-29 16:38:50 -07001817 if options.grpc_proxy:
1818 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001819 else:
1820 try:
1821 options.isolate_server = net.fix_url(options.isolate_server)
1822 except ValueError as e:
1823 parser.error('--isolate-server %s' % e)
Takuto Ikutaae767b32020-05-11 01:22:19 +00001824
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001825 try:
1826 return auth.ensure_logged_in(options.isolate_server)
1827 except ValueError as e:
1828 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001829 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001830
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001831
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001832def add_cache_options(parser):
1833 cache_group = optparse.OptionGroup(parser, 'Cache management')
1834 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001835 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001836 help='Directory to keep a local cache of the files. Accelerates download '
1837 'by reusing already downloaded files. Default=%default')
1838 cache_group.add_option(
1839 '--max-cache-size',
1840 type='int',
1841 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001842 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001843 help='Trim if the cache gets larger than this value, default=%default')
1844 cache_group.add_option(
1845 '--min-free-space',
1846 type='int',
1847 metavar='NNN',
1848 default=2*1024*1024*1024,
1849 help='Trim if disk free space becomes lower than this value, '
1850 'default=%default')
1851 cache_group.add_option(
1852 '--max-items',
1853 type='int',
1854 metavar='NNN',
1855 default=100000,
1856 help='Trim if more than this number of items are in the cache '
Junji Watanabe38b28b02020-04-23 10:23:30 +00001857 'default=%default')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001858 parser.add_option_group(cache_group)
1859
1860
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001861def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001862 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001863 policies = local_caching.CachePolicies(
1864 options.max_cache_size,
1865 options.min_free_space,
1866 options.max_items,
1867 # 3 weeks.
Junji Watanabe38b28b02020-04-23 10:23:30 +00001868 max_age_secs=21 * 24 * 60 * 60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001869
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001870 # |options.cache| path may not exist until DiskContentAddressedCache()
1871 # instance is created.
1872 return local_caching.DiskContentAddressedCache(
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001873 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001874 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001875
1876
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001877class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
Junji Watanabe38b28b02020-04-23 10:23:30 +00001878
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001879 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001880 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001881 self,
1882 version=__version__,
1883 prog=os.path.basename(sys.modules[__name__].__file__),
1884 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001885 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001886
1887 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001888 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001889 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001890 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001891 return options, args
1892
1893
1894def main(args):
1895 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001896 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001897
1898
1899if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001900 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001901 fix_encoding.fix_encoding()
1902 tools.disable_buffering()
1903 colorama.init()
Takuto Ikuta160b4452020-04-15 06:33:55 +00001904 net.set_user_agent('isolateserver.py/' + __version__)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001905 sys.exit(main(sys.argv[1:]))