blob: 9687521c43bbbd313b114cd7e5134142ad42a02e [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Takuto Ikutaeb232032020-07-01 10:51:18 +00008from __future__ import print_function
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Takuto Ikuta160b4452020-04-15 06:33:55 +000025from utils import net
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
Takuto Ikuta6e2ff962019-10-29 12:35:27 +000033import six
Lei Leife202df2019-06-11 17:33:34 +000034from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000035
36# pylint: disable=ungrouped-imports
37import auth
38import isolated_format
39import isolate_storage
40import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050041from utils import file_path
maruel12e30012015-10-09 11:55:35 -070042from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040043from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000044from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040045from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070046from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000047from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080048
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000049
Takuto Ikutaeb232032020-07-01 10:51:18 +000050__version__ = '0.9.0'
51
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000052# Version of isolate protocol passed to the server in /handshake request.
53ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000054
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000055
Vadim Shtayura3148e072014-09-02 18:51:52 -070056# Maximum expected delay (in seconds) between successive file fetches or uploads
57# in Storage. If it takes longer than that, a deadlock might be happening
58# and all stack frames for all threads are dumped to log.
59DEADLOCK_TIMEOUT = 5 * 60
60
61
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# All files are sorted by likelihood of a change in the file content
64# (currently file size is used to estimate this: larger the file -> larger the
65# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000066# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000067# and so on. Numbers here is a trade-off; the more per request, the lower the
68# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
69# larger values cause longer lookups, increasing the initial latency to start
70# uploading, which is especially an issue for large files. This value is
71# optimized for the "few thousands files to look up with minimal number of large
72# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040073ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000074
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000075
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000076# A list of already compressed extension types that should not receive any
77# compression before being uploaded.
78ALREADY_COMPRESSED_TYPES = [
Junji Watanabe38b28b02020-04-23 10:23:30 +000079 '7z',
80 'avi',
81 'cur',
82 'gif',
83 'h264',
84 'jar',
85 'jpeg',
86 'jpg',
87 'mp4',
88 'pdf',
89 'png',
90 'wav',
91 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000092]
93
Junji Watanabe8dc9fe12020-10-05 01:27:24 +000094# The delay (in seconds) to wait between logging statements when retrieving the
95# required files. This is intended to let the user know that the program is
96# still running.
maruel@chromium.org41601642013-09-18 19:40:46 +000097DELAY_BETWEEN_UPDATES_IN_SECS = 30
98
99
Ye Kuang3d283e82020-09-25 09:56:27 +0000100DEFAULT_DENYLIST = (
Junji Watanabe38b28b02020-04-23 10:23:30 +0000101 # Temporary vim or python files.
102 r'^.+\.(?:pyc|swp)$',
103 # .git or .svn directory.
104 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -0500105)
106
107
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -0500108class Error(Exception):
109 """Generic runtime error."""
110 pass
111
112
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400113class Aborted(Error):
114 """Operation aborted."""
115 pass
116
117
nodir90bc8dc2016-06-15 13:35:21 -0700118class AlreadyExists(Error):
119 """File already exists."""
120
121
maruel12e30012015-10-09 11:55:35 -0700122def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800123 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700124 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800125 if offset:
126 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000127 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000128 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000129 if not data:
130 break
131 yield data
132
133
tansell9e04a8d2016-07-28 09:31:59 -0700134def fileobj_path(fileobj):
135 """Return file system path for file like object or None.
136
137 The returned path is guaranteed to exist and can be passed to file system
138 operations like copy.
139 """
140 name = getattr(fileobj, 'name', None)
141 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000142 return None
tansell9e04a8d2016-07-28 09:31:59 -0700143
144 # If the file like object was created using something like open("test.txt")
145 # name will end up being a str (such as a function outside our control, like
146 # the standard library). We want all our paths to be unicode objects, so we
147 # decode it.
Takuto Ikuta95459dd2019-10-29 12:39:47 +0000148 if not isinstance(name, six.text_type):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500149 # We incorrectly assume that UTF-8 is used everywhere.
150 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700151
tansell26de79e2016-11-13 18:41:11 -0800152 # fs.exists requires an absolute path, otherwise it will fail with an
153 # assertion error.
154 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000155 return None
tansell26de79e2016-11-13 18:41:11 -0800156
tansell9e04a8d2016-07-28 09:31:59 -0700157 if fs.exists(name):
158 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000159 return None
tansell9e04a8d2016-07-28 09:31:59 -0700160
161
162# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
163# wrappers have been created.
164def fileobj_copy(
165 dstfileobj, srcfileobj, size=-1,
166 chunk_size=isolated_format.DISK_FILE_CHUNK):
167 """Copy data from srcfileobj to dstfileobj.
168
169 Providing size means exactly that amount of data will be copied (if there
170 isn't enough data, an IOError exception is thrown). Otherwise all data until
171 the EOF marker will be copied.
172 """
173 if size == -1 and hasattr(srcfileobj, 'tell'):
174 if srcfileobj.tell() != 0:
175 raise IOError('partial file but not using size')
176
177 written = 0
178 while written != size:
179 readsize = chunk_size
180 if size > 0:
181 readsize = min(readsize, size-written)
182 data = srcfileobj.read(readsize)
183 if not data:
184 if size == -1:
185 break
186 raise IOError('partial file, got %s, wanted %s' % (written, size))
187 dstfileobj.write(data)
188 written += len(data)
189
190
191def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
192 """Put srcfileobj at the given dstpath with given mode.
193
194 The function aims to do this as efficiently as possible while still allowing
195 any possible file like object be given.
196
197 Creating a tree of hardlinks has a few drawbacks:
198 - tmpfs cannot be used for the scratch space. The tree has to be on the same
199 partition as the cache.
200 - involves a write to the inode, which advances ctime, cause a metadata
201 writeback (causing disk seeking).
202 - cache ctime cannot be used to detect modifications / corruption.
203 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
204 partition. This is why the function automatically fallbacks to copying the
205 file content.
206 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
207 same owner is for all hardlinks.
208 - Anecdotal report that ext2 is known to be potentially faulty on high rate
209 of hardlink creation.
210
211 Creating a tree of symlinks has a few drawbacks:
212 - Tasks running the equivalent of os.path.realpath() will get the naked path
213 and may fail.
214 - Windows:
215 - Symlinks are reparse points:
216 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
217 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
218 - Symbolic links are Win32 paths, not NT paths.
219 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
220 - Symbolic links are supported on Windows 7 and later only.
221 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
222 default.
223 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
224 RID is present in the token;
225 https://msdn.microsoft.com/en-us/library/bb530410.aspx
226 """
227 srcpath = fileobj_path(srcfileobj)
228 if srcpath and size == -1:
229 readonly = file_mode is None or (
230 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
231
232 if readonly:
233 # If the file is read only we can link the file
234 if use_symlink:
235 link_mode = file_path.SYMLINK_WITH_FALLBACK
236 else:
237 link_mode = file_path.HARDLINK_WITH_FALLBACK
238 else:
239 # If not read only, we must copy the file
240 link_mode = file_path.COPY
241
242 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000243 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700244 else:
245 # Need to write out the file
246 with fs.open(dstpath, 'wb') as dstfileobj:
247 fileobj_copy(dstfileobj, srcfileobj, size)
248
Takuto Ikuta523c6472019-09-18 02:53:34 +0000249 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
250 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
251 # early to skip slow/unnecessary chmod call.
252 return
tansell9e04a8d2016-07-28 09:31:59 -0700253
254 # file_mode of 0 is actually valid, so need explicit check.
255 if file_mode is not None:
256 fs.chmod(dstpath, file_mode)
257
258
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000259def zip_compress(content_generator, level=7):
260 """Reads chunks from |content_generator| and yields zip compressed chunks."""
261 compressor = zlib.compressobj(level)
262 for chunk in content_generator:
263 compressed = compressor.compress(chunk)
264 if compressed:
265 yield compressed
266 tail = compressor.flush(zlib.Z_FINISH)
267 if tail:
268 yield tail
269
270
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400271def zip_decompress(
272 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000273 """Reads zipped data from |content_generator| and yields decompressed data.
274
275 Decompresses data in small chunks (no larger than |chunk_size|) so that
276 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
277
278 Raises IOError if data is corrupted or incomplete.
279 """
280 decompressor = zlib.decompressobj()
281 compressed_size = 0
282 try:
283 for chunk in content_generator:
284 compressed_size += len(chunk)
285 data = decompressor.decompress(chunk, chunk_size)
286 if data:
287 yield data
288 while decompressor.unconsumed_tail:
289 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
290 if data:
291 yield data
292 tail = decompressor.flush()
293 if tail:
294 yield tail
295 except zlib.error as e:
296 raise IOError(
297 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
298 # Ensure all data was read and decompressed.
299 if decompressor.unused_data or decompressor.unconsumed_tail:
300 raise IOError('Not all data was decompressed')
301
302
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000303def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000304 """Given a filename calculates the ideal zip compression level to use."""
305 file_ext = os.path.splitext(filename)[1].lower()
306 # TODO(csharp): Profile to find what compression level works best.
307 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
308
309
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000310def create_directories(base_directory, files):
311 """Creates the directory structure needed by the given list of files."""
312 logging.debug('create_directories(%s, %d)', base_directory, len(files))
313 # Creates the tree of directories to create.
314 directories = set(os.path.dirname(f) for f in files)
315 for item in list(directories):
316 while item:
317 directories.add(item)
318 item = os.path.dirname(item)
319 for d in sorted(directories):
320 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700321 abs_d = os.path.join(base_directory, d)
322 if not fs.isdir(abs_d):
323 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000324
325
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000326def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500327 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000328 for filepath, properties in files:
329 if 'l' not in properties:
330 continue
331 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500332 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000333 logging.warning('Ignoring symlink %s', filepath)
334 continue
335 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700336 try:
337 os.symlink(properties['l'], outfile) # pylint: disable=E1101
338 except OSError as e:
339 if e.errno == errno.EEXIST:
340 raise AlreadyExists('File %s already exists.' % outfile)
341 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000342
343
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000344class _ThreadFile(object):
345 """Multithreaded fake file. Used by TarBundle."""
346 def __init__(self):
347 self._data = threading_utils.TaskChannel()
348 self._offset = 0
349
350 def __iter__(self):
351 return self._data
352
353 def tell(self):
354 return self._offset
355
356 def write(self, b):
357 self._data.send_result(b)
358 self._offset += len(b)
359
360 def close(self):
361 self._data.send_done()
362
363
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400364class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800365 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000366
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800367 Its digest and size may be provided in advance, if known. Otherwise they will
368 be derived from the file content.
369 """
370
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000371 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800372 super(FileItem, self).__init__(
373 digest,
maruel12e30012015-10-09 11:55:35 -0700374 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000375 high_priority,
376 compression_level=_get_zip_compression_level(path))
377 self._path = path
378 self._algo = algo
379 self._meta = None
380
381 @property
382 def path(self):
383 return self._path
384
385 @property
Takuto Ikuta57cb09c2020-04-23 04:10:49 +0000386 def algo(self):
387 return self._algo
388
389 @property
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000390 def digest(self):
391 if not self._digest:
392 self._digest = isolated_format.hash_file(self._path, self._algo)
393 return self._digest
394
395 @property
396 def meta(self):
397 if not self._meta:
398 # TODO(maruel): Inline.
Takuto Ikutaa5f12c52020-06-11 06:52:01 +0000399 self._meta = isolated_format.file_to_metadata(self.path, False)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000400 # We need to hash right away.
401 self._meta['h'] = self.digest
402 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000403
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800404 def content(self):
405 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000406
407
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000408class TarBundle(isolate_storage.Item):
409 """Tarfile to push to Storage.
410
411 Its digest is the digest of all the files it contains. It is generated on the
412 fly.
413 """
414
415 def __init__(self, root, algo):
416 # 2 trailing 512 bytes headers.
417 super(TarBundle, self).__init__(size=1024)
418 self._items = []
419 self._meta = None
420 self._algo = algo
421 self._root_len = len(root) + 1
422 # Same value as for Go.
423 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
424 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
425 self._archive_max_size = int(10e6)
426
427 @property
428 def digest(self):
429 if not self._digest:
430 self._prepare()
431 return self._digest
432
433 @property
434 def size(self):
435 if self._size is None:
436 self._prepare()
437 return self._size
438
439 def try_add(self, item):
440 """Try to add this file to the bundle.
441
442 It is extremely naive but this should be just enough for
443 https://crbug.com/825418.
444
445 Future improvements should be in the Go code, and the Swarming bot should be
446 migrated to use the Go code instead.
447 """
448 if not item.size:
449 return False
450 # pylint: disable=unreachable
451 rounded = (item.size + 512) & ~511
452 if rounded + self._size > self._archive_max_size:
453 return False
454 # https://crbug.com/825418
455 return False
456 self._size += rounded
457 self._items.append(item)
458 return True
459
460 def yield_item_path_meta(self):
461 """Returns a tuple(Item, filepath, meta_dict).
462
463 If the bundle contains less than 5 items, the items are yielded.
464 """
465 if len(self._items) < 5:
466 # The tarball is too small, yield individual items, if any.
467 for item in self._items:
468 yield item, item.path[self._root_len:], item.meta
469 else:
470 # This ensures self._meta is set.
471 p = self.digest + '.tar'
472 # Yield itself as a tarball.
473 yield self, p, self._meta
474
475 def content(self):
476 """Generates the tarfile content on the fly."""
477 obj = _ThreadFile()
478 def _tar_thread():
479 try:
480 t = tarfile.open(
481 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
482 for item in self._items:
483 logging.info(' tarring %s', item.path)
484 t.add(item.path)
485 t.close()
486 except Exception:
487 logging.exception('Internal failure')
488 finally:
489 obj.close()
490
491 t = threading.Thread(target=_tar_thread)
492 t.start()
493 try:
494 for data in obj:
495 yield data
496 finally:
497 t.join()
498
499 def _prepare(self):
500 h = self._algo()
501 total = 0
502 for chunk in self.content():
503 h.update(chunk)
504 total += len(chunk)
505 # pylint: disable=attribute-defined-outside-init
506 # This is not true, they are defined in Item.__init__().
507 self._digest = h.hexdigest()
508 self._size = total
509 self._meta = {
510 'h': self.digest,
511 's': self.size,
512 't': u'tar',
513 }
514
515
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400516class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000517 """A byte buffer to push to Storage."""
518
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000519 def __init__(self, buf, algo, high_priority=False):
520 super(BufferItem, self).__init__(
521 digest=algo(buf).hexdigest(),
522 size=len(buf),
523 high_priority=high_priority)
524 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000525
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800526 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000527 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000528
529
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000530class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000532
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800533 Implements compression support, parallel 'contains' checks, parallel uploads
534 and more.
535
536 Works only within single namespace (and thus hashing algorithm and compression
537 scheme are fixed).
538
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400539 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
540 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800541 """
542
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700543 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 self._storage_api = storage_api
545 self._cpu_thread_pool = None
546 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400547 self._aborted = False
548 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000549
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000550 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000551 def server_ref(self):
552 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700553
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000554 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700555 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000556 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700557
558 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 def cpu_thread_pool(self):
560 """ThreadPool for CPU-bound tasks like zipping."""
561 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500562 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000563 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
564 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500565 # On 32 bits userland, do not try to use more than 16 threads.
566 threads = min(threads, 16)
567 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000568 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000569
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000570 @property
571 def net_thread_pool(self):
572 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
573 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700574 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000575 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000576
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000577 def close(self):
578 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400579 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000580 if self._cpu_thread_pool:
581 self._cpu_thread_pool.join()
582 self._cpu_thread_pool.close()
583 self._cpu_thread_pool = None
584 if self._net_thread_pool:
585 self._net_thread_pool.join()
586 self._net_thread_pool.close()
587 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400588 logging.info('Done.')
589
590 def abort(self):
591 """Cancels any pending or future operations."""
592 # This is not strictly theadsafe, but in the worst case the logging message
593 # will be printed twice. Not a big deal. In other places it is assumed that
594 # unprotected reads and writes to _aborted are serializable (it is true
595 # for python) and thus no locking is used.
596 if not self._aborted:
597 logging.warning('Aborting... It can take a while.')
598 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000599
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000600 def __enter__(self):
601 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400602 assert not self._prev_sig_handlers, self._prev_sig_handlers
603 for s in (signal.SIGINT, signal.SIGTERM):
604 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000605 return self
606
607 def __exit__(self, _exc_type, _exc_value, _traceback):
608 """Context manager interface."""
609 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400610 while self._prev_sig_handlers:
611 s, h = self._prev_sig_handlers.popitem()
612 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000613 return False
614
Takuto Ikuta26980872020-04-09 06:56:37 +0000615 def upload_items(self, items, verify_push=False):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000616 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000617
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800618 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000619
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000620 It uses 3 threads internally:
621 - One to create batches based on a timeout
622 - One to dispatch the /contains RPC and field the missing entries
623 - One to field the /push RPC
624
625 The main threads enumerates 'items' and pushes to the first thread. Then it
626 join() all the threads, waiting for them to complete.
627
628 (enumerate items of Item, this can be slow as disk is traversed)
629 |
630 v
631 _create_items_batches_thread Thread #1
632 (generates list(Item), every 3s or 20~100 items)
633 |
634 v
635 _do_lookups_thread Thread #2
636 | |
637 v v
638 (missing) (was on server)
639 |
640 v
641 _handle_missing_thread Thread #3
642 |
643 v
644 (upload Item, append to uploaded)
645
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000646 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400647 items: list of isolate_storage.Item instances that represents data to
648 upload.
Takuto Ikuta26980872020-04-09 06:56:37 +0000649 verify_push: verify files are uploaded correctly by fetching from server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000650
651 Returns:
652 List of items that were uploaded. All other items are already there.
Ye Kuang4e994292020-04-10 07:07:35 +0000653
654 Raises:
655 The first exception being raised in the worker threads.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000656 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000657 incoming = Queue.Queue()
658 batches_to_lookup = Queue.Queue()
659 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000660 uploaded = []
Ye Kuang4e994292020-04-10 07:07:35 +0000661 exc_channel = threading_utils.TaskChannel()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800662
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000663 def _create_items_batches_thread():
664 """Creates batches for /contains RPC lookup from individual items.
665
666 Input: incoming
667 Output: batches_to_lookup
668 """
669 try:
670 batch_size_index = 0
671 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
672 batch = []
673 while not self._aborted:
674 try:
675 item = incoming.get(True, timeout=3)
676 if item:
677 batch.append(item)
678 except Queue.Empty:
679 item = False
680 if len(batch) == batch_size or (not item and batch):
681 if len(batch) == batch_size:
682 batch_size_index += 1
683 batch_size = ITEMS_PER_CONTAINS_QUERIES[
684 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
685 batches_to_lookup.put(batch)
686 batch = []
687 if item is None:
688 break
Ye Kuang4e994292020-04-10 07:07:35 +0000689 except Exception:
690 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000691 finally:
692 # Unblock the next pipeline.
693 batches_to_lookup.put(None)
694
695 def _do_lookups_thread():
696 """Enqueues all the /contains RPCs and emits the missing items.
697
698 Input: batches_to_lookup
699 Output: missing, to_upload
700 """
701 try:
702 channel = threading_utils.TaskChannel()
703 def _contains(b):
704 if self._aborted:
705 raise Aborted()
706 return self._storage_api.contains(b)
707
708 pending_contains = 0
709 while not self._aborted:
710 batch = batches_to_lookup.get()
711 if batch is None:
712 break
713 self.net_thread_pool.add_task_with_channel(
714 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
715 pending_contains += 1
716 while pending_contains and not self._aborted:
717 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000718 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000719 except threading_utils.TaskChannel.Timeout:
720 break
721 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000722 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000723 missing.put((missing_item, push_state))
724 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000725 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000726 missing.put((missing_item, push_state))
727 pending_contains -= 1
Ye Kuang4e994292020-04-10 07:07:35 +0000728 except Exception:
729 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000730 finally:
731 # Unblock the next pipeline.
732 missing.put((None, None))
733
734 def _handle_missing_thread():
735 """Sends the missing items to the uploader.
736
737 Input: missing
738 Output: uploaded
739 """
Ye Kuang4e994292020-04-10 07:07:35 +0000740 try:
741 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
742 channel = threading_utils.TaskChannel()
743 pending_upload = 0
744 while not self._aborted:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000745 try:
Ye Kuang4e994292020-04-10 07:07:35 +0000746 missing_item, push_state = missing.get(True, timeout=5)
747 if missing_item is None:
748 break
749 self._async_push(channel, missing_item, push_state, verify_push)
750 pending_upload += 1
751 except Queue.Empty:
752 pass
753 detector.ping()
754 while not self._aborted and pending_upload:
755 try:
756 item = channel.next(timeout=0)
757 except threading_utils.TaskChannel.Timeout:
758 break
759 uploaded.append(item)
760 pending_upload -= 1
761 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
762 pending_upload, item.digest, item.size)
763 while not self._aborted and pending_upload:
764 item = channel.next()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000765 uploaded.append(item)
766 pending_upload -= 1
767 logging.debug(
768 'Uploaded %d; %d pending: %s (%d)',
769 len(uploaded), pending_upload, item.digest, item.size)
Ye Kuang4e994292020-04-10 07:07:35 +0000770 except Exception:
771 exc_channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000772
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000773 threads = [
774 threading.Thread(target=_create_items_batches_thread),
775 threading.Thread(target=_do_lookups_thread),
776 threading.Thread(target=_handle_missing_thread),
777 ]
778 for t in threads:
779 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000780
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000781 try:
782 # For each digest keep only first isolate_storage.Item that matches it.
783 # All other items are just indistinguishable copies from the point of view
784 # of isolate server (it doesn't care about paths at all, only content and
785 # digests).
786 seen = {}
787 try:
788 # TODO(maruel): Reorder the items as a priority queue, with larger items
789 # being processed first. This is, before hashing the data.
790 # This must be done in the primary thread since items can be a
791 # generator.
792 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000793 if seen.setdefault(item.digest, item) is item:
794 incoming.put(item)
795 finally:
796 incoming.put(None)
797 finally:
798 for t in threads:
799 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +0000800 exc_channel.send_done()
801 for _ in exc_channel:
802 # If there is no exception, this loop does nothing. Otherwise, it raises
803 # the first exception put onto |exc_channel|.
804 pass
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000805
806 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000807 if seen:
808 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000809 return uploaded
810
Takuto Ikuta26980872020-04-09 06:56:37 +0000811 def _async_push(self, channel, item, push_state, verify_push=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000812 """Starts asynchronous push to the server in a parallel thread.
813
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000814 Can be used only after |item| was checked for presence on a server with a
815 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800816
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000817 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000818 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400819 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000820 push_state: push state returned by storage_api.contains(). It contains
821 storage specific information describing how to upload the item (for
822 example in case of cloud storage, it is signed upload URLs).
Takuto Ikuta26980872020-04-09 06:56:37 +0000823 verify_push: verify files are uploaded correctly by fetching from server.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800824
825 Returns:
826 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000827 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800828 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400829 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700830 threading_utils.PRIORITY_HIGH if item.high_priority
831 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800832
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000833 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400834 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400835 if self._aborted:
836 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800837 self._storage_api.push(item, push_state, content)
Takuto Ikuta26980872020-04-09 06:56:37 +0000838 if verify_push:
Takuto Ikutac01532c2020-04-21 07:56:54 +0000839 try:
840 self._fetch(
841 item.digest,
842 item.size,
843 # this consumes all elements from given generator.
844 lambda gen: collections.deque(gen, maxlen=0))
845 except Exception:
846 # reset push_state if failed to verify.
847 push_state.finalized = False
848 push_state.uploaded = False
849 raise
850
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000851 return item
852
Wei Huang1a38fbe2017-11-28 22:55:22 -0500853 # If zipping is not required, just start a push task. Don't pass 'content'
854 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000855 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000856 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000857 return
858
859 # If zipping is enabled, zip in a separate thread.
860 def zip_and_push():
861 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
862 # content right here. It will block until all file is zipped.
863 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400864 if self._aborted:
865 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800866 stream = zip_compress(item.content(), item.compression_level)
Lei Lei73a5f732020-03-23 20:36:14 +0000867 # In Python3, zlib.compress returns a byte object instead of str.
868 data = six.b('').join(stream)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000869 except Exception as exc:
870 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800871 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000872 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500873 # Pass '[data]' explicitly because the compressed data is not same as the
874 # one provided by 'item'. Since '[data]' is a list, it can safely be
875 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000876 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000877 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000878 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000879
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800880 def push(self, item, push_state):
881 """Synchronously pushes a single item to the server.
882
883 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000884 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800885
886 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400887 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000888 push_state: push state returned by storage_api.contains(). It contains
889 storage specific information describing how to upload the item (for
890 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800891
892 Returns:
893 Pushed item (same object as |item|).
894 """
895 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700896 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000897 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000898 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800899 assert pushed is item
900 return item
901
Takuto Ikuta26980872020-04-09 06:56:37 +0000902 def _fetch(self, digest, size, sink):
903 try:
904 # Prepare reading pipeline.
905 stream = self._storage_api.fetch(digest, size, 0)
906 if self.server_ref.is_with_compression:
907 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
908 # Run |stream| through verifier that will assert its size.
909 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
910 size)
911 # Verified stream goes to |sink|.
912 sink(verifier.run())
Takuto Ikuta98dcc372020-04-20 09:09:28 +0000913 except Exception:
914 logging.exception('Failed to fetch %s', digest)
Takuto Ikuta26980872020-04-09 06:56:37 +0000915 raise
916
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000917 def async_fetch(self, channel, priority, digest, size, sink):
918 """Starts asynchronous fetch from the server in a parallel thread.
919
920 Arguments:
921 channel: TaskChannel that receives back |digest| when download ends.
922 priority: thread pool task priority for the fetch.
923 digest: hex digest of an item to download.
924 size: expected size of the item (after decompression).
925 sink: function that will be called as sink(generator).
926 """
927 def fetch():
Takuto Ikuta26980872020-04-09 06:56:37 +0000928 self._fetch(digest, size, sink)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000929 return digest
930
931 # Don't bother with zip_thread_pool for decompression. Decompression is
932 # really fast and most probably IO bound anyway.
933 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
934
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000935
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000936class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400937 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000938
939 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400940 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
941 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000942 """
943
944 def __init__(self, storage, cache):
945 self.storage = storage
946 self.cache = cache
947 self._channel = threading_utils.TaskChannel()
948 self._pending = set()
949 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000950 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400951 # Pending digests that the caller waits for, see wait_on()/wait().
952 self._waiting_on = set()
953 # Already fetched digests the caller waits for which are not yet returned by
954 # wait().
955 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000956
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400957 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700958 self,
959 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400960 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700961 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000962 """Starts asynchronous fetch of item |digest|."""
963 # Fetching it now?
964 if digest in self._pending:
965 return
966
967 # Mark this file as in use, verify_all_cached will later ensure it is still
968 # in cache.
969 self._accessed.add(digest)
970
971 # Already fetched? Notify cache to update item's LRU position.
972 if digest in self._fetched:
973 # 'touch' returns True if item is in cache and not corrupted.
974 if self.cache.touch(digest, size):
975 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000976 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000977 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000978
979 # TODO(maruel): It should look at the free disk space, the current cache
980 # size and the size of the new item on every new item:
981 # - Trim the cache as more entries are listed when free disk space is low,
982 # otherwise if the amount of data downloaded during the run > free disk
983 # space, it'll crash.
984 # - Make sure there's enough free disk space to fit all dependencies of
985 # this run! If not, abort early.
986
987 # Start fetching.
988 self._pending.add(digest)
989 self.storage.async_fetch(
990 self._channel, priority, digest, size,
991 functools.partial(self.cache.write, digest))
992
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400993 def wait_on(self, digest):
994 """Updates digests to be waited on by 'wait'."""
995 # Calculate once the already fetched items. These will be retrieved first.
996 if digest in self._fetched:
997 self._waiting_on_ready.add(digest)
998 else:
999 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001000
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001001 def wait(self):
1002 """Waits until any of waited-on items is retrieved.
1003
1004 Once this happens, it is remove from the waited-on set and returned.
1005
1006 This function is called in two waves. The first wave it is done for HIGH
1007 priority items, the isolated files themselves. The second wave it is called
1008 for all the files.
1009
1010 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001011 """
1012 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001013 if self._waiting_on_ready:
1014 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001016 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001017
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001018 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001019 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001020 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001021 self._pending.remove(digest)
1022 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001023 if digest in self._waiting_on:
1024 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001025 return digest
1026
1027 # Should never reach this point due to assert above.
1028 raise RuntimeError('Impossible state')
1029
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001030 @property
1031 def wait_queue_empty(self):
1032 """Returns True if there is no digest left for wait() to return."""
1033 return not self._waiting_on and not self._waiting_on_ready
1034
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001035 def inject_local_file(self, path, algo):
1036 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -07001037 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001038 data = f.read()
1039 digest = algo(data).hexdigest()
1040 self.cache.write(digest, [data])
1041 self._fetched.add(digest)
1042 return digest
1043
1044 @property
1045 def pending_count(self):
1046 """Returns number of items to be fetched."""
1047 return len(self._pending)
1048
1049 def verify_all_cached(self):
1050 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001051 # Not thread safe, but called after all work is done.
1052 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001053
1054
1055class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001056 """Verifies that fetched file is valid before passing it to the
1057 ContentAddressedCache.
1058 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001059
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001060 def __init__(self, stream, hasher, expected_digest, expected_size):
1061 """Initializes the verifier.
1062
1063 Arguments:
1064 * stream: an iterable yielding chunks of content
1065 * hasher: an object from hashlib that supports update() and hexdigest()
1066 (eg, hashlib.sha1).
1067 * expected_digest: if the entire stream is piped through hasher and then
1068 summarized via hexdigest(), this should be the result. That is, it
1069 should be a hex string like 'abc123'.
1070 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001071 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001072 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001073 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001074 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001075 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001076 self.expected_size = expected_size
1077 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001078 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001079
1080 def run(self):
1081 """Generator that yields same items as |stream|.
1082
1083 Verifies |stream| is complete before yielding a last chunk to consumer.
1084
1085 Also wraps IOError produced by consumer into MappingError exceptions since
1086 otherwise Storage will retry fetch on unrelated local cache errors.
1087 """
1088 # Read one chunk ahead, keep it in |stored|.
1089 # That way a complete stream can be verified before pushing last chunk
1090 # to consumer.
1091 stored = None
1092 for chunk in self.stream:
1093 assert chunk is not None
1094 if stored is not None:
1095 self._inspect_chunk(stored, is_last=False)
1096 try:
1097 yield stored
1098 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001099 raise isolated_format.MappingError(
1100 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001101 stored = chunk
1102 if stored is not None:
1103 self._inspect_chunk(stored, is_last=True)
1104 try:
1105 yield stored
1106 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001107 raise isolated_format.MappingError(
1108 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001109
1110 def _inspect_chunk(self, chunk, is_last):
1111 """Called for each fetched chunk before passing it to consumer."""
1112 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001113 self.rolling_hash.update(chunk)
1114 if not is_last:
1115 return
1116
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001117 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001118 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001119 msg = 'Incorrect file size: want %d, got %d' % (
1120 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001121 raise IOError(msg)
1122
1123 actual_digest = self.rolling_hash.hexdigest()
1124 if self.expected_digest != actual_digest:
1125 msg = 'Incorrect digest: want %s, got %s' % (
1126 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001127 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001128
1129
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001130class IsolatedBundle(object):
1131 """Fetched and parsed .isolated file with all dependencies."""
1132
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001133 def __init__(self, filter_cb):
1134 """
1135 filter_cb: callback function to filter downloaded content.
1136 When filter_cb is not None, Isolated file is downloaded iff
1137 filter_cb(filepath) returns True.
1138 """
1139
Vadim Shtayura3148e072014-09-02 18:51:52 -07001140 self.command = []
1141 self.files = {}
Vadim Shtayura3148e072014-09-02 18:51:52 -07001142 self.relative_cwd = None
1143 # The main .isolated file, a IsolatedFile instance.
1144 self.root = None
1145
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001146 self._filter_cb = filter_cb
1147
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001148 def fetch(self, fetch_queue, root_isolated_hash, algo):
1149 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001150
1151 It enables support for "included" .isolated files. They are processed in
1152 strict order but fetched asynchronously from the cache. This is important so
1153 that a file in an included .isolated file that is overridden by an embedding
1154 .isolated file is not fetched needlessly. The includes are fetched in one
1155 pass and the files are fetched as soon as all the ones on the left-side
1156 of the tree were fetched.
1157
1158 The prioritization is very important here for nested .isolated files.
1159 'includes' have the highest priority and the algorithm is optimized for both
1160 deep and wide trees. A deep one is a long link of .isolated files referenced
1161 one at a time by one item in 'includes'. A wide one has a large number of
1162 'includes' in a single .isolated file. 'left' is defined as an included
1163 .isolated file earlier in the 'includes' list. So the order of the elements
1164 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001165
1166 As a side effect this method starts asynchronous fetch of all data files
1167 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1168 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001169 """
1170 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1171
1172 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1173 pending = {}
1174 # Set of hashes of already retrieved items to refuse recursive includes.
1175 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001176 # Set of IsolatedFile's whose data files have already being fetched.
1177 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001178
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001179 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001180 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001181 h = isolated_file.obj_hash
1182 if h in seen:
1183 raise isolated_format.IsolatedError(
1184 'IsolatedFile %s is retrieved recursively' % h)
1185 assert h not in pending
1186 seen.add(h)
1187 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001188 # This isolated item is being added dynamically, notify FetchQueue.
1189 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001190 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1191
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001192 # Start fetching root *.isolated file (single file, not the whole bundle).
1193 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001194
1195 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001196 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001197 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001198 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001199 with fetch_queue.cache.getfileobj(item_hash) as f:
1200 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001201
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001202 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001203 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001204 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001205
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001206 # Always fetch *.isolated files in traversal order, waiting if necessary
1207 # until next to-be-processed node loads. "Waiting" is done by yielding
1208 # back to the outer loop, that waits until some *.isolated is loaded.
1209 for node in isolated_format.walk_includes(self.root):
1210 if node not in processed:
1211 # Not visited, and not yet loaded -> wait for it to load.
1212 if not node.is_loaded:
1213 break
1214 # Not visited and loaded -> process it and continue the traversal.
1215 self._start_fetching_files(node, fetch_queue)
1216 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001217
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001218 # All *.isolated files should be processed by now and only them.
1219 all_isolateds = set(isolated_format.walk_includes(self.root))
1220 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001221 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001222
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001223 # Extract 'command' and other bundle properties.
1224 for node in isolated_format.walk_includes(self.root):
1225 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001226 self.relative_cwd = self.relative_cwd or ''
1227
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001228 def _start_fetching_files(self, isolated, fetch_queue):
1229 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001230
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001231 Modifies self.files.
1232 """
maruel10bea7b2016-12-07 05:03:49 -08001233 files = isolated.data.get('files', {})
1234 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001235 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001236 if self._filter_cb and not self._filter_cb(filepath):
1237 continue
1238
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001239 # Root isolated has priority on the files being mapped. In particular,
1240 # overridden files must not be fetched.
1241 if filepath not in self.files:
1242 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001243
tansell9e04a8d2016-07-28 09:31:59 -07001244 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001245 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001246 fetch_queue.add(
1247 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1248
1249 def _update_self(self, node):
1250 """Extracts bundle global parameters from loaded *.isolated file.
1251
1252 Will be called with each loaded *.isolated file in order of traversal of
1253 isolated include graph (see isolated_format.walk_includes).
1254 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001255 # Grabs properties.
1256 if not self.command and node.data.get('command'):
1257 # Ensure paths are correctly separated on windows.
1258 self.command = node.data['command']
1259 if self.command:
1260 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001261 if (self.relative_cwd is None and
1262 node.data.get('relative_cwd') is not None):
1263 self.relative_cwd = node.data['relative_cwd']
1264
1265
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001266def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001267 """Returns Storage class that can upload and download from |namespace|.
1268
1269 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001270 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001271
1272 Returns:
1273 Instance of Storage.
1274 """
Lei Lei73a5f732020-03-23 20:36:14 +00001275 # Handle the specific internal use case.
1276 assert (isinstance(server_ref, isolate_storage.ServerRef) or
1277 type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001278 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001279
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001280
Takuto Ikutaa5f12c52020-06-11 06:52:01 +00001281def _map_file(dst, digest, props, cache, use_symlinks):
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001282 """Put downloaded file to destination path. This function is used for multi
1283 threaded file putting.
1284 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001285 with tools.Profiler("_map_file for %s" % dst):
1286 with cache.getfileobj(digest) as srcfileobj:
1287 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001288
Takuto Ikuta523c6472019-09-18 02:53:34 +00001289 if filetype == 'basic':
1290 # Ignore all bits apart from the user.
1291 file_mode = (props.get('m') or 0o500) & 0o700
Takuto Ikuta523c6472019-09-18 02:53:34 +00001292 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001293
Takuto Ikuta523c6472019-09-18 02:53:34 +00001294 elif filetype == 'tar':
1295 basedir = os.path.dirname(dst)
1296 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1297 ensured_dirs = set()
1298 for ti in t:
1299 if not ti.isfile():
1300 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1301 ti.type)
1302 continue
1303 # Handle files created on Windows fetched on POSIX and the
1304 # reverse.
1305 other_sep = '/' if os.path.sep == '\\' else '\\'
1306 name = ti.name.replace(other_sep, os.path.sep)
1307 fp = os.path.normpath(os.path.join(basedir, name))
1308 if not fp.startswith(basedir):
1309 logging.error('Path(%r) is outside root directory', fp)
1310 ifd = t.extractfile(ti)
1311 fp_dir = os.path.dirname(fp)
1312 if fp_dir not in ensured_dirs:
1313 file_path.ensure_tree(fp_dir)
1314 ensured_dirs.add(fp_dir)
1315 file_mode = ti.mode & 0o700
Takuto Ikuta523c6472019-09-18 02:53:34 +00001316 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001317
Takuto Ikuta523c6472019-09-18 02:53:34 +00001318 else:
1319 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001320
1321
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001322def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1323 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001324 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001325
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001326 Arguments:
1327 isolated_hash: hash of the root *.isolated file.
1328 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001329 cache: ContentAddressedCache class that knows how to store and map files
1330 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001331 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001332 use_symlinks: Use symlinks instead of hardlinks when True.
Ye Kuang5e914472020-09-28 09:18:34 +00001333 filter_cb: filter that works as allowlist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001334
1335 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001336 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001337 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001338 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001339 'fetch_isolated(%s, %s, %s, %s, %s)',
1340 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001341 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001342 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001343 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001344 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001345
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001346 with tools.Profiler('GetIsolateds'):
1347 # Optionally support local files by manually adding them to cache.
1348 if not isolated_format.is_valid_hash(isolated_hash, algo):
Junji Watanabe38b28b02020-04-23 10:23:30 +00001349 logging.debug(
1350 '%s is not a valid hash, assuming a file '
1351 '(algo was %s, hash size was %d)', isolated_hash, algo(),
1352 algo().digest_size)
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001353 path = six.text_type(os.path.abspath(isolated_hash))
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001354 try:
1355 isolated_hash = fetch_queue.inject_local_file(path, algo)
1356 except IOError as e:
1357 raise isolated_format.MappingError(
1358 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1359 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001360
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001361 # Load all *.isolated and start loading rest of the files.
1362 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001363
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001364 with tools.Profiler('GetRest'):
1365 # Create file system hierarchy.
1366 file_path.ensure_tree(outdir)
1367 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001368 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001369
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001370 # Ensure working directory exists.
1371 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1372 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001373
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001374 # Multimap: digest -> list of pairs (path, props).
1375 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001376 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001377 if 'h' in props:
1378 remaining.setdefault(props['h'], []).append((filepath, props))
1379 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001380
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001381 # Now block on the remaining files to be downloaded and mapped.
1382 logging.info('Retrieving remaining files (%d of them)...',
1383 fetch_queue.pending_count)
1384 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001385
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001386 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1387 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1388 while remaining:
1389 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001390
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001391 # Wait for any item to finish fetching to cache.
1392 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001393
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001394 # Create the files in the destination using item in cache as the
1395 # source.
1396 for filepath, props in remaining.pop(digest):
1397 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001398
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001399 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
Takuto Ikutaa5f12c52020-06-11 06:52:01 +00001400 _map_file, fullpath, digest, props,
1401 cache, use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001402
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001403 # Report progress.
1404 duration = time.time() - last_update
1405 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1406 msg = '%d files remaining...' % len(remaining)
1407 sys.stdout.write(msg + '\n')
1408 sys.stdout.flush()
1409 logging.info(msg)
1410 last_update = time.time()
1411 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1412 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001413
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001414 # Save the cache right away to not loose the state of the new objects.
1415 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001416 # Cache could evict some items we just tried to fetch, it's a fatal error.
1417 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001418 free_disk = file_path.get_free_space(cache.cache_dir)
1419 msg = (
1420 'Cache is too small to hold all requested files.\n'
1421 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001422 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001423 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001424 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001425
1426
Ye Kuang3d283e82020-09-25 09:56:27 +00001427def _directory_to_metadata(root, algo, denylist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001428 """Yields every file and/or symlink found.
1429
1430 Yields:
1431 tuple(FileItem, relpath, metadata)
1432 For a symlink, FileItem is None.
1433 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001434 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001435 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001436 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001437 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001438 root,
1439 u'.' + os.path.sep,
Ye Kuang3d283e82020-09-25 09:56:27 +00001440 denylist,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001441 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001442
1443 filepath = os.path.join(root, relpath)
1444 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001445 # TODO(maruel): Do not call this.
Takuto Ikutaa5f12c52020-06-11 06:52:01 +00001446 meta = isolated_format.file_to_metadata(filepath, False)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001447 yield None, relpath, meta
1448 continue
1449
1450 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001451 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1452 # The file was added to the current pending tarball and won't be archived
1453 # individually.
1454 continue
1455
1456 # Flush and reset the bundle.
1457 for i, p, m in bundle.yield_item_path_meta():
1458 yield i, p, m
1459 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001460
1461 # Yield the file individually.
1462 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1463 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001464
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001465 for i, p, m in bundle.yield_item_path_meta():
1466 yield i, p, m
1467
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001468
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001469def _print_upload_stats(items, missing):
1470 """Prints upload stats."""
1471 total = len(items)
1472 total_size = sum(f.size for f in items)
1473 logging.info(
1474 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1475 cache_hit = set(items).difference(missing)
1476 cache_hit_size = sum(f.size for f in cache_hit)
1477 logging.info(
1478 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1479 len(cache_hit),
1480 cache_hit_size / 1024.,
1481 len(cache_hit) * 100. / total,
1482 cache_hit_size * 100. / total_size if total_size else 0)
1483 cache_miss = missing
1484 cache_miss_size = sum(f.size for f in cache_miss)
Junji Watanabe38b28b02020-04-23 10:23:30 +00001485 logging.info('cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1486 len(cache_miss), cache_miss_size / 1024.,
1487 len(cache_miss) * 100. / total,
1488 cache_miss_size * 100. / total_size if total_size else 0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001489
1490
Ye Kuang3d283e82020-09-25 09:56:27 +00001491def _enqueue_dir(dirpath, denylist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001492 """Called by archive_files_to_storage for a directory.
1493
1494 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001495
1496 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001497 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001498 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001499 files = {}
Ye Kuang3d283e82020-09-25 09:56:27 +00001500 for item, relpath, meta in _directory_to_metadata(dirpath, hash_algo,
1501 denylist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001502 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001503 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001504 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001505 yield item
1506
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001507 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001508 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001509 'algo': hash_algo_name,
1510 'files': files,
1511 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001512 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001513 # Keep the file in memory. This is fine because .isolated files are relatively
1514 # small.
1515 yield BufferItem(
Takuto Ikuta630f99d2020-07-02 12:59:35 +00001516 tools.format_json(data, True).encode(),
1517 algo=hash_algo,
1518 high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001519
1520
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001521def _archive_files_to_storage_internal(storage,
1522 files,
Ye Kuang3d283e82020-09-25 09:56:27 +00001523 denylist,
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001524 verify_push=False):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001525 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001526
1527 Arguments:
1528 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001529 files: iterable of files to upload. If a directory is specified (with a
1530 trailing slash), a .isolated file is created and its hash is returned.
1531 Duplicates are skipped.
Ye Kuang3d283e82020-09-25 09:56:27 +00001532 denylist: function that returns True if a file should be omitted.
Takuto Ikuta26980872020-04-09 06:56:37 +00001533 verify_push: verify files are uploaded correctly by fetching from server.
maruel064c0a32016-04-05 11:47:15 -07001534
1535 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001536 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001537 The first file in the first item is always the .isolated file.
Ye Kuang4e994292020-04-10 07:07:35 +00001538
1539 Raises:
1540 Re-raises the exception in upload_items(), if there is any.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001541 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001542 # Dict of path to hash.
1543 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001544 hash_algo = storage.server_ref.hash_algo
1545 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001546 # Generator of FileItem to pass to upload_items() concurrent operation.
1547 channel = threading_utils.TaskChannel()
Ye Kuang4e994292020-04-10 07:07:35 +00001548 exc_channel = threading_utils.TaskChannel()
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001549 uploaded_digests = set()
Ye Kuang4e994292020-04-10 07:07:35 +00001550
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001551 def _upload_items():
Ye Kuang4e994292020-04-10 07:07:35 +00001552 try:
1553 results = storage.upload_items(channel, verify_push)
1554 uploaded_digests.update(f.digest for f in results)
1555 except Exception:
1556 exc_channel.send_exception()
1557
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001558 t = threading.Thread(target=_upload_items)
1559 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001560
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001561 # Keep track locally of the items to determine cold and hot items.
1562 items_found = []
1563 try:
1564 for f in files:
Takuto Ikuta95459dd2019-10-29 12:39:47 +00001565 assert isinstance(f, six.text_type), repr(f)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001566 if f in results:
1567 # Duplicate
1568 continue
1569 try:
1570 filepath = os.path.abspath(f)
1571 if fs.isdir(filepath):
1572 # Uploading a whole directory.
1573 item = None
Ye Kuang3d283e82020-09-25 09:56:27 +00001574 for item in _enqueue_dir(filepath, denylist, hash_algo,
1575 hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001576 channel.send_result(item)
1577 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001578 # The very last item will be the .isolated file.
1579 if not item:
1580 # There was no file in the directory.
1581 continue
1582 elif fs.isfile(filepath):
1583 item = FileItem(
1584 path=filepath,
1585 algo=hash_algo,
1586 size=None,
1587 high_priority=f.endswith('.isolated'))
1588 channel.send_result(item)
1589 items_found.append(item)
1590 else:
1591 raise Error('%s is neither a file or directory.' % f)
1592 results[f] = item.digest
1593 except OSError:
1594 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001595 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001596 # Stops the generator, so _upload_items() can exit.
1597 channel.send_done()
1598 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +00001599 exc_channel.send_done()
Takuto Ikuta640fcbf2020-04-23 00:12:46 +00001600
1601 try:
1602 for _ in exc_channel:
1603 pass
1604 except Exception:
1605 # log items when failed to upload files.
1606 for item in items_found:
1607 if isinstance(item, FileItem):
1608 logging.error('FileItem path: %s, digest:%s, re-calculated digest:%s',
1609 item.path, item.digest,
1610 isolated_format.hash_file(item.path, item.algo))
1611 continue
1612
1613 logging.error('Item digest:%s', item.digest)
1614
1615 raise
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001616
1617 cold = []
1618 hot = []
1619 for i in items_found:
1620 # Note that multiple FileItem may have the same .digest.
1621 if i.digest in uploaded_digests:
1622 cold.append(i)
1623 else:
1624 hot.append(i)
1625 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001626
1627
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001628# TODO(crbug.com/1073832):
1629# remove this if process leak in coverage build was fixed.
Ye Kuang3d283e82020-09-25 09:56:27 +00001630def archive_files_to_storage(storage, files, denylist, verify_push=False):
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001631 """Calls _archive_files_to_storage_internal with retry.
1632
1633 Arguments:
1634 See Arguments section in _archive_files_to_storage_internal
1635
1636 Returns:
1637 See Returns section in _archive_files_to_storage_internal
1638
1639 Raises:
1640 Re-raises the exception in _archive_files_to_storage_internal if all retry
1641 failed.
1642 """
1643
1644 # Will do exponential backoff.
1645 # e.g. 10, 20, 40, 80
1646 backoff = 10
1647
1648 while True:
1649 try:
Ye Kuang3d283e82020-09-25 09:56:27 +00001650 return _archive_files_to_storage_internal(storage, files, denylist,
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001651 verify_push)
1652 except Exception:
1653 if backoff > 100:
1654 raise
1655
Takuto Ikutaf98be312020-05-08 10:22:54 +00001656 on_error.report('error before %d second backoff' % backoff)
1657
Takuto Ikuta8a5e5992020-04-28 05:09:45 +00001658 logging.exception(
1659 'failed to run _archive_files_to_storage_internal,'
1660 ' will retry after %d seconds', backoff)
1661 time.sleep(backoff)
1662 backoff *= 2
1663
1664
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001665@subcommand.usage('<file1..fileN> or - to read from stdin')
1666def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001667 """Archives data to the server.
1668
1669 If a directory is specified, a .isolated file is created the whole directory
1670 is uploaded. Then this .isolated file can be included in another one to run
1671 commands.
1672
1673 The commands output each file that was processed with its content hash. For
1674 directories, the .isolated generated for the directory is listed as the
1675 directory entry itself.
1676 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001677 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001678 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001679 options, files = parser.parse_args(args)
Takuto Ikutaae767b32020-05-11 01:22:19 +00001680 process_isolate_server_options(parser, options, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001681 server_ref = isolate_storage.ServerRef(
1682 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001683 if files == ['-']:
1684 files = (l.rstrip('\n\r') for l in sys.stdin)
1685 if not files:
1686 parser.error('Nothing to upload')
Takuto Ikuta8bf73262020-07-01 12:59:57 +00001687 files = (six.ensure_text(f) for f in files)
Ye Kuang3d283e82020-09-25 09:56:27 +00001688 denylist = tools.gen_denylist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001689 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001690 with get_storage(server_ref) as storage:
Ye Kuang3d283e82020-09-25 09:56:27 +00001691 results, _cold, _hot = archive_files_to_storage(storage, files, denylist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001692 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001693 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001694 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001695 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001696
1697
1698def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001699 """Download data from the server.
1700
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001701 It can either download individual files or a complete tree from a .isolated
1702 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001703 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001704 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001705 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001706 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001707 help='hash of an isolated file, .isolated file content is discarded, use '
1708 '--file if you need it')
1709 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001710 '-f',
1711 '--file',
1712 metavar='HASH DEST',
1713 default=[],
1714 action='append',
1715 nargs=2,
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001716 help='hash and destination of a file, can be used multiple times')
1717 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001718 '-t',
1719 '--target',
1720 metavar='DIR',
1721 default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001722 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001723 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001724 '--use-symlinks',
1725 action='store_true',
maruel4409e302016-07-19 14:25:51 -07001726 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001727 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001728 options, args = parser.parse_args(args)
1729 if args:
1730 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001731 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001732 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001733
Takuto Ikutaae767b32020-05-11 01:22:19 +00001734 process_isolate_server_options(parser, options, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001735 if bool(options.isolated) == bool(options.file):
1736 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001737 if not options.cache and options.use_symlinks:
1738 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001739
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001740 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001741 cache.cleanup()
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001742 options.target = six.text_type(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001743 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001744 if (fs.isfile(options.target) or
1745 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001746 parser.error(
1747 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001748 server_ref = isolate_storage.ServerRef(
1749 options.isolate_server, options.namespace)
1750 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001751 # Fetching individual files.
1752 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001753 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001754 channel = threading_utils.TaskChannel()
1755 pending = {}
1756 for digest, dest in options.file:
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001757 dest = six.text_type(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001758 pending[digest] = dest
1759 storage.async_fetch(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001760 channel, threading_utils.PRIORITY_MED, digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001761 local_caching.UNKNOWN_FILE_SIZE,
Junji Watanabe38b28b02020-04-23 10:23:30 +00001762 functools.partial(local_caching.file_write,
1763 os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001764 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001765 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001766 dest = pending.pop(fetched)
1767 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001768
Vadim Shtayura3172be52013-12-03 12:49:05 -08001769 # Fetching whole isolated tree.
1770 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001771 bundle = fetch_isolated(
1772 isolated_hash=options.isolated,
1773 storage=storage,
1774 cache=cache,
1775 outdir=options.target,
1776 use_symlinks=options.use_symlinks)
1777 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001778 if bundle.command:
1779 rel = os.path.join(options.target, bundle.relative_cwd)
1780 print('To run this test please run from the directory %s:' %
1781 os.path.join(options.target, rel))
1782 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001783
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001784 return 0
1785
1786
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001787def add_archive_options(parser):
1788 parser.add_option(
1789 '--blacklist',
Ye Kuang3d283e82020-09-25 09:56:27 +00001790 action='append',
1791 default=list(DEFAULT_DENYLIST),
1792 help='List of regexp to use as denylist filter when uploading '
1793 'directories')
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001794
1795
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001796def add_isolate_server_options(parser):
1797 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001798 parser.add_option(
1799 '-I', '--isolate-server',
1800 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001801 help='URL of the Isolate Server to use. Defaults to the environment '
1802 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1803 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001804 parser.add_option(
Junji Watanabe38b28b02020-04-23 10:23:30 +00001805 '--namespace',
1806 default='default-gzip',
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001807 help='The namespace to use on the Isolate Server, default: %default')
1808
1809
Takuto Ikutaae767b32020-05-11 01:22:19 +00001810def process_isolate_server_options(parser, options, required):
nodir55be77b2016-05-03 09:39:57 -07001811 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001812
1813 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001814 """
1815 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001816 if required:
1817 parser.error('--isolate-server is required.')
1818 return
1819
Takuto Ikuta47c77612020-06-29 04:23:39 +00001820 try:
1821 options.isolate_server = net.fix_url(options.isolate_server)
1822 except ValueError as e:
1823 parser.error('--isolate-server %s' % e)
Takuto Ikutaae767b32020-05-11 01:22:19 +00001824
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001825 try:
1826 return auth.ensure_logged_in(options.isolate_server)
1827 except ValueError as e:
1828 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001829 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001830
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001831
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001832def add_cache_options(parser):
Junji Watanabeb03450b2020-09-25 05:09:27 +00001833 cache_group = optparse.OptionGroup(parser, 'Isolated cache management')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001834 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001835 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001836 help='Directory to keep a local cache of the files. Accelerates download '
1837 'by reusing already downloaded files. Default=%default')
1838 cache_group.add_option(
1839 '--max-cache-size',
1840 type='int',
1841 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001842 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001843 help='Trim if the cache gets larger than this value, default=%default')
1844 cache_group.add_option(
1845 '--min-free-space',
1846 type='int',
1847 metavar='NNN',
1848 default=2*1024*1024*1024,
1849 help='Trim if disk free space becomes lower than this value, '
1850 'default=%default')
1851 cache_group.add_option(
1852 '--max-items',
1853 type='int',
1854 metavar='NNN',
1855 default=100000,
1856 help='Trim if more than this number of items are in the cache '
Junji Watanabe38b28b02020-04-23 10:23:30 +00001857 'default=%default')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001858 parser.add_option_group(cache_group)
1859
1860
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001861def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001862 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001863 policies = local_caching.CachePolicies(
1864 options.max_cache_size,
1865 options.min_free_space,
1866 options.max_items,
1867 # 3 weeks.
Junji Watanabe38b28b02020-04-23 10:23:30 +00001868 max_age_secs=21 * 24 * 60 * 60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001869
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001870 # |options.cache| path may not exist until DiskContentAddressedCache()
1871 # instance is created.
1872 return local_caching.DiskContentAddressedCache(
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001873 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001874 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001875
1876
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001877class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
Junji Watanabe38b28b02020-04-23 10:23:30 +00001878
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001879 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001880 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001881 self,
1882 version=__version__,
1883 prog=os.path.basename(sys.modules[__name__].__file__),
1884 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001885 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001886
1887 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001888 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001889 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001890 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001891 return options, args
1892
1893
1894def main(args):
1895 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001896 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001897
1898
1899if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001900 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001901 fix_encoding.fix_encoding()
1902 tools.disable_buffering()
1903 colorama.init()
Takuto Ikuta160b4452020-04-15 06:33:55 +00001904 net.set_user_agent('isolateserver.py/' + __version__)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001905 sys.exit(main(sys.argv[1:]))