blob: ce740e9d05002aa441dabcefb81787ac062e6e08 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Takuto Ikuta160b4452020-04-15 06:33:55 +000025from utils import net
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
Takuto Ikuta6e2ff962019-10-29 12:35:27 +000033import six
Lei Leife202df2019-06-11 17:33:34 +000034from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000035
36# pylint: disable=ungrouped-imports
37import auth
38import isolated_format
39import isolate_storage
40import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050041from utils import file_path
maruel12e30012015-10-09 11:55:35 -070042from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040043from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000044from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040045from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070046from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000047from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080048
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000049
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000050# Version of isolate protocol passed to the server in /handshake request.
51ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000052
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053
Vadim Shtayura3148e072014-09-02 18:51:52 -070054# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
tansell9e04a8d2016-07-28 09:31:59 -0700122def fileobj_path(fileobj):
123 """Return file system path for file like object or None.
124
125 The returned path is guaranteed to exist and can be passed to file system
126 operations like copy.
127 """
128 name = getattr(fileobj, 'name', None)
129 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000130 return None
tansell9e04a8d2016-07-28 09:31:59 -0700131
132 # If the file like object was created using something like open("test.txt")
133 # name will end up being a str (such as a function outside our control, like
134 # the standard library). We want all our paths to be unicode objects, so we
135 # decode it.
Takuto Ikuta95459dd2019-10-29 12:39:47 +0000136 if not isinstance(name, six.text_type):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500137 # We incorrectly assume that UTF-8 is used everywhere.
138 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700139
tansell26de79e2016-11-13 18:41:11 -0800140 # fs.exists requires an absolute path, otherwise it will fail with an
141 # assertion error.
142 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000143 return None
tansell26de79e2016-11-13 18:41:11 -0800144
tansell9e04a8d2016-07-28 09:31:59 -0700145 if fs.exists(name):
146 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000147 return None
tansell9e04a8d2016-07-28 09:31:59 -0700148
149
150# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
151# wrappers have been created.
152def fileobj_copy(
153 dstfileobj, srcfileobj, size=-1,
154 chunk_size=isolated_format.DISK_FILE_CHUNK):
155 """Copy data from srcfileobj to dstfileobj.
156
157 Providing size means exactly that amount of data will be copied (if there
158 isn't enough data, an IOError exception is thrown). Otherwise all data until
159 the EOF marker will be copied.
160 """
161 if size == -1 and hasattr(srcfileobj, 'tell'):
162 if srcfileobj.tell() != 0:
163 raise IOError('partial file but not using size')
164
165 written = 0
166 while written != size:
167 readsize = chunk_size
168 if size > 0:
169 readsize = min(readsize, size-written)
170 data = srcfileobj.read(readsize)
171 if not data:
172 if size == -1:
173 break
174 raise IOError('partial file, got %s, wanted %s' % (written, size))
175 dstfileobj.write(data)
176 written += len(data)
177
178
179def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
180 """Put srcfileobj at the given dstpath with given mode.
181
182 The function aims to do this as efficiently as possible while still allowing
183 any possible file like object be given.
184
185 Creating a tree of hardlinks has a few drawbacks:
186 - tmpfs cannot be used for the scratch space. The tree has to be on the same
187 partition as the cache.
188 - involves a write to the inode, which advances ctime, cause a metadata
189 writeback (causing disk seeking).
190 - cache ctime cannot be used to detect modifications / corruption.
191 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
192 partition. This is why the function automatically fallbacks to copying the
193 file content.
194 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
195 same owner is for all hardlinks.
196 - Anecdotal report that ext2 is known to be potentially faulty on high rate
197 of hardlink creation.
198
199 Creating a tree of symlinks has a few drawbacks:
200 - Tasks running the equivalent of os.path.realpath() will get the naked path
201 and may fail.
202 - Windows:
203 - Symlinks are reparse points:
204 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
205 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
206 - Symbolic links are Win32 paths, not NT paths.
207 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
208 - Symbolic links are supported on Windows 7 and later only.
209 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
210 default.
211 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
212 RID is present in the token;
213 https://msdn.microsoft.com/en-us/library/bb530410.aspx
214 """
215 srcpath = fileobj_path(srcfileobj)
216 if srcpath and size == -1:
217 readonly = file_mode is None or (
218 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
219
220 if readonly:
221 # If the file is read only we can link the file
222 if use_symlink:
223 link_mode = file_path.SYMLINK_WITH_FALLBACK
224 else:
225 link_mode = file_path.HARDLINK_WITH_FALLBACK
226 else:
227 # If not read only, we must copy the file
228 link_mode = file_path.COPY
229
230 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000231 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700232 else:
233 # Need to write out the file
234 with fs.open(dstpath, 'wb') as dstfileobj:
235 fileobj_copy(dstfileobj, srcfileobj, size)
236
Takuto Ikuta523c6472019-09-18 02:53:34 +0000237 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
238 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
239 # early to skip slow/unnecessary chmod call.
240 return
tansell9e04a8d2016-07-28 09:31:59 -0700241
242 # file_mode of 0 is actually valid, so need explicit check.
243 if file_mode is not None:
244 fs.chmod(dstpath, file_mode)
245
246
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000247def zip_compress(content_generator, level=7):
248 """Reads chunks from |content_generator| and yields zip compressed chunks."""
249 compressor = zlib.compressobj(level)
250 for chunk in content_generator:
251 compressed = compressor.compress(chunk)
252 if compressed:
253 yield compressed
254 tail = compressor.flush(zlib.Z_FINISH)
255 if tail:
256 yield tail
257
258
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400259def zip_decompress(
260 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """Reads zipped data from |content_generator| and yields decompressed data.
262
263 Decompresses data in small chunks (no larger than |chunk_size|) so that
264 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
265
266 Raises IOError if data is corrupted or incomplete.
267 """
268 decompressor = zlib.decompressobj()
269 compressed_size = 0
270 try:
271 for chunk in content_generator:
272 compressed_size += len(chunk)
273 data = decompressor.decompress(chunk, chunk_size)
274 if data:
275 yield data
276 while decompressor.unconsumed_tail:
277 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
278 if data:
279 yield data
280 tail = decompressor.flush()
281 if tail:
282 yield tail
283 except zlib.error as e:
284 raise IOError(
285 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
286 # Ensure all data was read and decompressed.
287 if decompressor.unused_data or decompressor.unconsumed_tail:
288 raise IOError('Not all data was decompressed')
289
290
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000291def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000292 """Given a filename calculates the ideal zip compression level to use."""
293 file_ext = os.path.splitext(filename)[1].lower()
294 # TODO(csharp): Profile to find what compression level works best.
295 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
296
297
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000298def create_directories(base_directory, files):
299 """Creates the directory structure needed by the given list of files."""
300 logging.debug('create_directories(%s, %d)', base_directory, len(files))
301 # Creates the tree of directories to create.
302 directories = set(os.path.dirname(f) for f in files)
303 for item in list(directories):
304 while item:
305 directories.add(item)
306 item = os.path.dirname(item)
307 for d in sorted(directories):
308 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700309 abs_d = os.path.join(base_directory, d)
310 if not fs.isdir(abs_d):
311 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000312
313
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000314def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500315 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000316 for filepath, properties in files:
317 if 'l' not in properties:
318 continue
319 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500320 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000321 logging.warning('Ignoring symlink %s', filepath)
322 continue
323 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700324 try:
325 os.symlink(properties['l'], outfile) # pylint: disable=E1101
326 except OSError as e:
327 if e.errno == errno.EEXIST:
328 raise AlreadyExists('File %s already exists.' % outfile)
329 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000330
331
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000332class _ThreadFile(object):
333 """Multithreaded fake file. Used by TarBundle."""
334 def __init__(self):
335 self._data = threading_utils.TaskChannel()
336 self._offset = 0
337
338 def __iter__(self):
339 return self._data
340
341 def tell(self):
342 return self._offset
343
344 def write(self, b):
345 self._data.send_result(b)
346 self._offset += len(b)
347
348 def close(self):
349 self._data.send_done()
350
351
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400352class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800353 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000354
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800355 Its digest and size may be provided in advance, if known. Otherwise they will
356 be derived from the file content.
357 """
358
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000359 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800360 super(FileItem, self).__init__(
361 digest,
maruel12e30012015-10-09 11:55:35 -0700362 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000363 high_priority,
364 compression_level=_get_zip_compression_level(path))
365 self._path = path
366 self._algo = algo
367 self._meta = None
368
369 @property
370 def path(self):
371 return self._path
372
373 @property
374 def digest(self):
375 if not self._digest:
376 self._digest = isolated_format.hash_file(self._path, self._algo)
377 return self._digest
378
379 @property
380 def meta(self):
381 if not self._meta:
382 # TODO(maruel): Inline.
383 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
384 # We need to hash right away.
385 self._meta['h'] = self.digest
386 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000387
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 def content(self):
389 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000390
391
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000392class TarBundle(isolate_storage.Item):
393 """Tarfile to push to Storage.
394
395 Its digest is the digest of all the files it contains. It is generated on the
396 fly.
397 """
398
399 def __init__(self, root, algo):
400 # 2 trailing 512 bytes headers.
401 super(TarBundle, self).__init__(size=1024)
402 self._items = []
403 self._meta = None
404 self._algo = algo
405 self._root_len = len(root) + 1
406 # Same value as for Go.
407 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
408 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
409 self._archive_max_size = int(10e6)
410
411 @property
412 def digest(self):
413 if not self._digest:
414 self._prepare()
415 return self._digest
416
417 @property
418 def size(self):
419 if self._size is None:
420 self._prepare()
421 return self._size
422
423 def try_add(self, item):
424 """Try to add this file to the bundle.
425
426 It is extremely naive but this should be just enough for
427 https://crbug.com/825418.
428
429 Future improvements should be in the Go code, and the Swarming bot should be
430 migrated to use the Go code instead.
431 """
432 if not item.size:
433 return False
434 # pylint: disable=unreachable
435 rounded = (item.size + 512) & ~511
436 if rounded + self._size > self._archive_max_size:
437 return False
438 # https://crbug.com/825418
439 return False
440 self._size += rounded
441 self._items.append(item)
442 return True
443
444 def yield_item_path_meta(self):
445 """Returns a tuple(Item, filepath, meta_dict).
446
447 If the bundle contains less than 5 items, the items are yielded.
448 """
449 if len(self._items) < 5:
450 # The tarball is too small, yield individual items, if any.
451 for item in self._items:
452 yield item, item.path[self._root_len:], item.meta
453 else:
454 # This ensures self._meta is set.
455 p = self.digest + '.tar'
456 # Yield itself as a tarball.
457 yield self, p, self._meta
458
459 def content(self):
460 """Generates the tarfile content on the fly."""
461 obj = _ThreadFile()
462 def _tar_thread():
463 try:
464 t = tarfile.open(
465 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
466 for item in self._items:
467 logging.info(' tarring %s', item.path)
468 t.add(item.path)
469 t.close()
470 except Exception:
471 logging.exception('Internal failure')
472 finally:
473 obj.close()
474
475 t = threading.Thread(target=_tar_thread)
476 t.start()
477 try:
478 for data in obj:
479 yield data
480 finally:
481 t.join()
482
483 def _prepare(self):
484 h = self._algo()
485 total = 0
486 for chunk in self.content():
487 h.update(chunk)
488 total += len(chunk)
489 # pylint: disable=attribute-defined-outside-init
490 # This is not true, they are defined in Item.__init__().
491 self._digest = h.hexdigest()
492 self._size = total
493 self._meta = {
494 'h': self.digest,
495 's': self.size,
496 't': u'tar',
497 }
498
499
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400500class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501 """A byte buffer to push to Storage."""
502
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000503 def __init__(self, buf, algo, high_priority=False):
504 super(BufferItem, self).__init__(
505 digest=algo(buf).hexdigest(),
506 size=len(buf),
507 high_priority=high_priority)
508 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000511 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000512
513
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000514class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000516
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800517 Implements compression support, parallel 'contains' checks, parallel uploads
518 and more.
519
520 Works only within single namespace (and thus hashing algorithm and compression
521 scheme are fixed).
522
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400523 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
524 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800525 """
526
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700527 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000528 self._storage_api = storage_api
529 self._cpu_thread_pool = None
530 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400531 self._aborted = False
532 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000535 def server_ref(self):
536 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000538 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700539 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000540 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700541
542 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000543 def cpu_thread_pool(self):
544 """ThreadPool for CPU-bound tasks like zipping."""
545 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500546 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000547 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
548 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500549 # On 32 bits userland, do not try to use more than 16 threads.
550 threads = min(threads, 16)
551 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000553
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 @property
555 def net_thread_pool(self):
556 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
557 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700558 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000560
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000561 def close(self):
562 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400563 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000564 if self._cpu_thread_pool:
565 self._cpu_thread_pool.join()
566 self._cpu_thread_pool.close()
567 self._cpu_thread_pool = None
568 if self._net_thread_pool:
569 self._net_thread_pool.join()
570 self._net_thread_pool.close()
571 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400572 logging.info('Done.')
573
574 def abort(self):
575 """Cancels any pending or future operations."""
576 # This is not strictly theadsafe, but in the worst case the logging message
577 # will be printed twice. Not a big deal. In other places it is assumed that
578 # unprotected reads and writes to _aborted are serializable (it is true
579 # for python) and thus no locking is used.
580 if not self._aborted:
581 logging.warning('Aborting... It can take a while.')
582 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000583
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000584 def __enter__(self):
585 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400586 assert not self._prev_sig_handlers, self._prev_sig_handlers
587 for s in (signal.SIGINT, signal.SIGTERM):
588 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000589 return self
590
591 def __exit__(self, _exc_type, _exc_value, _traceback):
592 """Context manager interface."""
593 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400594 while self._prev_sig_handlers:
595 s, h = self._prev_sig_handlers.popitem()
596 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000597 return False
598
Takuto Ikuta26980872020-04-09 06:56:37 +0000599 def upload_items(self, items, verify_push=False):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000600 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000601
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800602 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000603
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000604 It uses 3 threads internally:
605 - One to create batches based on a timeout
606 - One to dispatch the /contains RPC and field the missing entries
607 - One to field the /push RPC
608
609 The main threads enumerates 'items' and pushes to the first thread. Then it
610 join() all the threads, waiting for them to complete.
611
612 (enumerate items of Item, this can be slow as disk is traversed)
613 |
614 v
615 _create_items_batches_thread Thread #1
616 (generates list(Item), every 3s or 20~100 items)
617 |
618 v
619 _do_lookups_thread Thread #2
620 | |
621 v v
622 (missing) (was on server)
623 |
624 v
625 _handle_missing_thread Thread #3
626 |
627 v
628 (upload Item, append to uploaded)
629
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000630 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400631 items: list of isolate_storage.Item instances that represents data to
632 upload.
Takuto Ikuta26980872020-04-09 06:56:37 +0000633 verify_push: verify files are uploaded correctly by fetching from server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000634
635 Returns:
636 List of items that were uploaded. All other items are already there.
Ye Kuang4e994292020-04-10 07:07:35 +0000637
638 Raises:
639 The first exception being raised in the worker threads.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000640 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000641 incoming = Queue.Queue()
642 batches_to_lookup = Queue.Queue()
643 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000644 uploaded = []
Ye Kuang4e994292020-04-10 07:07:35 +0000645 exc_channel = threading_utils.TaskChannel()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000647 def _create_items_batches_thread():
648 """Creates batches for /contains RPC lookup from individual items.
649
650 Input: incoming
651 Output: batches_to_lookup
652 """
653 try:
654 batch_size_index = 0
655 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
656 batch = []
657 while not self._aborted:
658 try:
659 item = incoming.get(True, timeout=3)
660 if item:
661 batch.append(item)
662 except Queue.Empty:
663 item = False
664 if len(batch) == batch_size or (not item and batch):
665 if len(batch) == batch_size:
666 batch_size_index += 1
667 batch_size = ITEMS_PER_CONTAINS_QUERIES[
668 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
669 batches_to_lookup.put(batch)
670 batch = []
671 if item is None:
672 break
Ye Kuang4e994292020-04-10 07:07:35 +0000673 except Exception:
674 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000675 finally:
676 # Unblock the next pipeline.
677 batches_to_lookup.put(None)
678
679 def _do_lookups_thread():
680 """Enqueues all the /contains RPCs and emits the missing items.
681
682 Input: batches_to_lookup
683 Output: missing, to_upload
684 """
685 try:
686 channel = threading_utils.TaskChannel()
687 def _contains(b):
688 if self._aborted:
689 raise Aborted()
690 return self._storage_api.contains(b)
691
692 pending_contains = 0
693 while not self._aborted:
694 batch = batches_to_lookup.get()
695 if batch is None:
696 break
697 self.net_thread_pool.add_task_with_channel(
698 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
699 pending_contains += 1
700 while pending_contains and not self._aborted:
701 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000702 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000703 except threading_utils.TaskChannel.Timeout:
704 break
705 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000706 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000707 missing.put((missing_item, push_state))
708 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000709 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000710 missing.put((missing_item, push_state))
711 pending_contains -= 1
Ye Kuang4e994292020-04-10 07:07:35 +0000712 except Exception:
713 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000714 finally:
715 # Unblock the next pipeline.
716 missing.put((None, None))
717
718 def _handle_missing_thread():
719 """Sends the missing items to the uploader.
720
721 Input: missing
722 Output: uploaded
723 """
Ye Kuang4e994292020-04-10 07:07:35 +0000724 try:
725 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
726 channel = threading_utils.TaskChannel()
727 pending_upload = 0
728 while not self._aborted:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000729 try:
Ye Kuang4e994292020-04-10 07:07:35 +0000730 missing_item, push_state = missing.get(True, timeout=5)
731 if missing_item is None:
732 break
733 self._async_push(channel, missing_item, push_state, verify_push)
734 pending_upload += 1
735 except Queue.Empty:
736 pass
737 detector.ping()
738 while not self._aborted and pending_upload:
739 try:
740 item = channel.next(timeout=0)
741 except threading_utils.TaskChannel.Timeout:
742 break
743 uploaded.append(item)
744 pending_upload -= 1
745 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
746 pending_upload, item.digest, item.size)
747 while not self._aborted and pending_upload:
748 item = channel.next()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000749 uploaded.append(item)
750 pending_upload -= 1
751 logging.debug(
752 'Uploaded %d; %d pending: %s (%d)',
753 len(uploaded), pending_upload, item.digest, item.size)
Ye Kuang4e994292020-04-10 07:07:35 +0000754 except Exception:
755 exc_channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000756
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000757 threads = [
758 threading.Thread(target=_create_items_batches_thread),
759 threading.Thread(target=_do_lookups_thread),
760 threading.Thread(target=_handle_missing_thread),
761 ]
762 for t in threads:
763 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000764
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000765 try:
766 # For each digest keep only first isolate_storage.Item that matches it.
767 # All other items are just indistinguishable copies from the point of view
768 # of isolate server (it doesn't care about paths at all, only content and
769 # digests).
770 seen = {}
771 try:
772 # TODO(maruel): Reorder the items as a priority queue, with larger items
773 # being processed first. This is, before hashing the data.
774 # This must be done in the primary thread since items can be a
775 # generator.
776 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000777 if seen.setdefault(item.digest, item) is item:
778 incoming.put(item)
779 finally:
780 incoming.put(None)
781 finally:
782 for t in threads:
783 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +0000784 exc_channel.send_done()
785 for _ in exc_channel:
786 # If there is no exception, this loop does nothing. Otherwise, it raises
787 # the first exception put onto |exc_channel|.
788 pass
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000789
790 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000791 if seen:
792 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000793 return uploaded
794
Takuto Ikuta26980872020-04-09 06:56:37 +0000795 def _async_push(self, channel, item, push_state, verify_push=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000796 """Starts asynchronous push to the server in a parallel thread.
797
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000798 Can be used only after |item| was checked for presence on a server with a
799 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800800
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000801 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000802 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400803 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000804 push_state: push state returned by storage_api.contains(). It contains
805 storage specific information describing how to upload the item (for
806 example in case of cloud storage, it is signed upload URLs).
Takuto Ikuta26980872020-04-09 06:56:37 +0000807 verify_push: verify files are uploaded correctly by fetching from server.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800808
809 Returns:
810 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000811 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800812 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400813 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700814 threading_utils.PRIORITY_HIGH if item.high_priority
815 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800816
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000817 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400818 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400819 if self._aborted:
820 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800821 self._storage_api.push(item, push_state, content)
Takuto Ikuta26980872020-04-09 06:56:37 +0000822 if verify_push:
Takuto Ikutac01532c2020-04-21 07:56:54 +0000823 try:
824 self._fetch(
825 item.digest,
826 item.size,
827 # this consumes all elements from given generator.
828 lambda gen: collections.deque(gen, maxlen=0))
829 except Exception:
830 # reset push_state if failed to verify.
831 push_state.finalized = False
832 push_state.uploaded = False
833 raise
834
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000835 return item
836
Wei Huang1a38fbe2017-11-28 22:55:22 -0500837 # If zipping is not required, just start a push task. Don't pass 'content'
838 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000839 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000840 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000841 return
842
843 # If zipping is enabled, zip in a separate thread.
844 def zip_and_push():
845 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
846 # content right here. It will block until all file is zipped.
847 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400848 if self._aborted:
849 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800850 stream = zip_compress(item.content(), item.compression_level)
Lei Lei73a5f732020-03-23 20:36:14 +0000851 # In Python3, zlib.compress returns a byte object instead of str.
852 data = six.b('').join(stream)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000853 except Exception as exc:
854 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800855 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000856 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500857 # Pass '[data]' explicitly because the compressed data is not same as the
858 # one provided by 'item'. Since '[data]' is a list, it can safely be
859 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000860 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000861 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000862 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000863
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800864 def push(self, item, push_state):
865 """Synchronously pushes a single item to the server.
866
867 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000868 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800869
870 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400871 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000872 push_state: push state returned by storage_api.contains(). It contains
873 storage specific information describing how to upload the item (for
874 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800875
876 Returns:
877 Pushed item (same object as |item|).
878 """
879 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700880 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000881 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000882 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800883 assert pushed is item
884 return item
885
Takuto Ikuta26980872020-04-09 06:56:37 +0000886 def _fetch(self, digest, size, sink):
887 try:
888 # Prepare reading pipeline.
889 stream = self._storage_api.fetch(digest, size, 0)
890 if self.server_ref.is_with_compression:
891 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
892 # Run |stream| through verifier that will assert its size.
893 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
894 size)
895 # Verified stream goes to |sink|.
896 sink(verifier.run())
Takuto Ikuta98dcc372020-04-20 09:09:28 +0000897 except Exception:
898 logging.exception('Failed to fetch %s', digest)
Takuto Ikuta26980872020-04-09 06:56:37 +0000899 raise
900
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000901 def async_fetch(self, channel, priority, digest, size, sink):
902 """Starts asynchronous fetch from the server in a parallel thread.
903
904 Arguments:
905 channel: TaskChannel that receives back |digest| when download ends.
906 priority: thread pool task priority for the fetch.
907 digest: hex digest of an item to download.
908 size: expected size of the item (after decompression).
909 sink: function that will be called as sink(generator).
910 """
911 def fetch():
Takuto Ikuta26980872020-04-09 06:56:37 +0000912 self._fetch(digest, size, sink)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000913 return digest
914
915 # Don't bother with zip_thread_pool for decompression. Decompression is
916 # really fast and most probably IO bound anyway.
917 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
918
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000919
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000920class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400921 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000922
923 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400924 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
925 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926 """
927
928 def __init__(self, storage, cache):
929 self.storage = storage
930 self.cache = cache
931 self._channel = threading_utils.TaskChannel()
932 self._pending = set()
933 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000934 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400935 # Pending digests that the caller waits for, see wait_on()/wait().
936 self._waiting_on = set()
937 # Already fetched digests the caller waits for which are not yet returned by
938 # wait().
939 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400941 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700942 self,
943 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400944 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700945 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000946 """Starts asynchronous fetch of item |digest|."""
947 # Fetching it now?
948 if digest in self._pending:
949 return
950
951 # Mark this file as in use, verify_all_cached will later ensure it is still
952 # in cache.
953 self._accessed.add(digest)
954
955 # Already fetched? Notify cache to update item's LRU position.
956 if digest in self._fetched:
957 # 'touch' returns True if item is in cache and not corrupted.
958 if self.cache.touch(digest, size):
959 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000960 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000961 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000962
963 # TODO(maruel): It should look at the free disk space, the current cache
964 # size and the size of the new item on every new item:
965 # - Trim the cache as more entries are listed when free disk space is low,
966 # otherwise if the amount of data downloaded during the run > free disk
967 # space, it'll crash.
968 # - Make sure there's enough free disk space to fit all dependencies of
969 # this run! If not, abort early.
970
971 # Start fetching.
972 self._pending.add(digest)
973 self.storage.async_fetch(
974 self._channel, priority, digest, size,
975 functools.partial(self.cache.write, digest))
976
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400977 def wait_on(self, digest):
978 """Updates digests to be waited on by 'wait'."""
979 # Calculate once the already fetched items. These will be retrieved first.
980 if digest in self._fetched:
981 self._waiting_on_ready.add(digest)
982 else:
983 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000984
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400985 def wait(self):
986 """Waits until any of waited-on items is retrieved.
987
988 Once this happens, it is remove from the waited-on set and returned.
989
990 This function is called in two waves. The first wave it is done for HIGH
991 priority items, the isolated files themselves. The second wave it is called
992 for all the files.
993
994 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000995 """
996 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400997 if self._waiting_on_ready:
998 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000999
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001000 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001001
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001002 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001003 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001004 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001005 self._pending.remove(digest)
1006 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001007 if digest in self._waiting_on:
1008 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001009 return digest
1010
1011 # Should never reach this point due to assert above.
1012 raise RuntimeError('Impossible state')
1013
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001014 @property
1015 def wait_queue_empty(self):
1016 """Returns True if there is no digest left for wait() to return."""
1017 return not self._waiting_on and not self._waiting_on_ready
1018
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001019 def inject_local_file(self, path, algo):
1020 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -07001021 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001022 data = f.read()
1023 digest = algo(data).hexdigest()
1024 self.cache.write(digest, [data])
1025 self._fetched.add(digest)
1026 return digest
1027
1028 @property
1029 def pending_count(self):
1030 """Returns number of items to be fetched."""
1031 return len(self._pending)
1032
1033 def verify_all_cached(self):
1034 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001035 # Not thread safe, but called after all work is done.
1036 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001037
1038
1039class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001040 """Verifies that fetched file is valid before passing it to the
1041 ContentAddressedCache.
1042 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001043
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001044 def __init__(self, stream, hasher, expected_digest, expected_size):
1045 """Initializes the verifier.
1046
1047 Arguments:
1048 * stream: an iterable yielding chunks of content
1049 * hasher: an object from hashlib that supports update() and hexdigest()
1050 (eg, hashlib.sha1).
1051 * expected_digest: if the entire stream is piped through hasher and then
1052 summarized via hexdigest(), this should be the result. That is, it
1053 should be a hex string like 'abc123'.
1054 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001055 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001056 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001057 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001058 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001059 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001060 self.expected_size = expected_size
1061 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001062 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001063
1064 def run(self):
1065 """Generator that yields same items as |stream|.
1066
1067 Verifies |stream| is complete before yielding a last chunk to consumer.
1068
1069 Also wraps IOError produced by consumer into MappingError exceptions since
1070 otherwise Storage will retry fetch on unrelated local cache errors.
1071 """
1072 # Read one chunk ahead, keep it in |stored|.
1073 # That way a complete stream can be verified before pushing last chunk
1074 # to consumer.
1075 stored = None
1076 for chunk in self.stream:
1077 assert chunk is not None
1078 if stored is not None:
1079 self._inspect_chunk(stored, is_last=False)
1080 try:
1081 yield stored
1082 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001083 raise isolated_format.MappingError(
1084 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001085 stored = chunk
1086 if stored is not None:
1087 self._inspect_chunk(stored, is_last=True)
1088 try:
1089 yield stored
1090 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001091 raise isolated_format.MappingError(
1092 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001093
1094 def _inspect_chunk(self, chunk, is_last):
1095 """Called for each fetched chunk before passing it to consumer."""
1096 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001097 self.rolling_hash.update(chunk)
1098 if not is_last:
1099 return
1100
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001101 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001102 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001103 msg = 'Incorrect file size: want %d, got %d' % (
1104 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001105 raise IOError(msg)
1106
1107 actual_digest = self.rolling_hash.hexdigest()
1108 if self.expected_digest != actual_digest:
1109 msg = 'Incorrect digest: want %s, got %s' % (
1110 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001111 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001112
1113
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001114class IsolatedBundle(object):
1115 """Fetched and parsed .isolated file with all dependencies."""
1116
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001117 def __init__(self, filter_cb):
1118 """
1119 filter_cb: callback function to filter downloaded content.
1120 When filter_cb is not None, Isolated file is downloaded iff
1121 filter_cb(filepath) returns True.
1122 """
1123
Vadim Shtayura3148e072014-09-02 18:51:52 -07001124 self.command = []
1125 self.files = {}
1126 self.read_only = None
1127 self.relative_cwd = None
1128 # The main .isolated file, a IsolatedFile instance.
1129 self.root = None
1130
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001131 self._filter_cb = filter_cb
1132
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001133 def fetch(self, fetch_queue, root_isolated_hash, algo):
1134 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001135
1136 It enables support for "included" .isolated files. They are processed in
1137 strict order but fetched asynchronously from the cache. This is important so
1138 that a file in an included .isolated file that is overridden by an embedding
1139 .isolated file is not fetched needlessly. The includes are fetched in one
1140 pass and the files are fetched as soon as all the ones on the left-side
1141 of the tree were fetched.
1142
1143 The prioritization is very important here for nested .isolated files.
1144 'includes' have the highest priority and the algorithm is optimized for both
1145 deep and wide trees. A deep one is a long link of .isolated files referenced
1146 one at a time by one item in 'includes'. A wide one has a large number of
1147 'includes' in a single .isolated file. 'left' is defined as an included
1148 .isolated file earlier in the 'includes' list. So the order of the elements
1149 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001150
1151 As a side effect this method starts asynchronous fetch of all data files
1152 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1153 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001154 """
1155 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1156
1157 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1158 pending = {}
1159 # Set of hashes of already retrieved items to refuse recursive includes.
1160 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001161 # Set of IsolatedFile's whose data files have already being fetched.
1162 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001163
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001164 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001165 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001166 h = isolated_file.obj_hash
1167 if h in seen:
1168 raise isolated_format.IsolatedError(
1169 'IsolatedFile %s is retrieved recursively' % h)
1170 assert h not in pending
1171 seen.add(h)
1172 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001173 # This isolated item is being added dynamically, notify FetchQueue.
1174 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001175 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1176
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001177 # Start fetching root *.isolated file (single file, not the whole bundle).
1178 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001179
1180 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001181 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001182 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001183 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001184 with fetch_queue.cache.getfileobj(item_hash) as f:
1185 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001186
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001187 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001188 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001189 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001190
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001191 # Always fetch *.isolated files in traversal order, waiting if necessary
1192 # until next to-be-processed node loads. "Waiting" is done by yielding
1193 # back to the outer loop, that waits until some *.isolated is loaded.
1194 for node in isolated_format.walk_includes(self.root):
1195 if node not in processed:
1196 # Not visited, and not yet loaded -> wait for it to load.
1197 if not node.is_loaded:
1198 break
1199 # Not visited and loaded -> process it and continue the traversal.
1200 self._start_fetching_files(node, fetch_queue)
1201 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001202
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001203 # All *.isolated files should be processed by now and only them.
1204 all_isolateds = set(isolated_format.walk_includes(self.root))
1205 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001206 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001207
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001208 # Extract 'command' and other bundle properties.
1209 for node in isolated_format.walk_includes(self.root):
1210 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001211 self.relative_cwd = self.relative_cwd or ''
1212
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001213 def _start_fetching_files(self, isolated, fetch_queue):
1214 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001215
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001216 Modifies self.files.
1217 """
maruel10bea7b2016-12-07 05:03:49 -08001218 files = isolated.data.get('files', {})
1219 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001220 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001221 if self._filter_cb and not self._filter_cb(filepath):
1222 continue
1223
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001224 # Root isolated has priority on the files being mapped. In particular,
1225 # overridden files must not be fetched.
1226 if filepath not in self.files:
1227 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001228
1229 # Make sure if the isolated is read only, the mode doesn't have write
1230 # bits.
1231 if 'm' in properties and self.read_only:
1232 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1233
1234 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001235 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001236 fetch_queue.add(
1237 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1238
1239 def _update_self(self, node):
1240 """Extracts bundle global parameters from loaded *.isolated file.
1241
1242 Will be called with each loaded *.isolated file in order of traversal of
1243 isolated include graph (see isolated_format.walk_includes).
1244 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001245 # Grabs properties.
1246 if not self.command and node.data.get('command'):
1247 # Ensure paths are correctly separated on windows.
1248 self.command = node.data['command']
1249 if self.command:
1250 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001251 if self.read_only is None and node.data.get('read_only') is not None:
1252 self.read_only = node.data['read_only']
1253 if (self.relative_cwd is None and
1254 node.data.get('relative_cwd') is not None):
1255 self.relative_cwd = node.data['relative_cwd']
1256
1257
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001258def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001259 """Returns Storage class that can upload and download from |namespace|.
1260
1261 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001262 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001263
1264 Returns:
1265 Instance of Storage.
1266 """
Lei Lei73a5f732020-03-23 20:36:14 +00001267 # Handle the specific internal use case.
1268 assert (isinstance(server_ref, isolate_storage.ServerRef) or
1269 type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001270 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001271
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001272
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001273def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1274 """Put downloaded file to destination path. This function is used for multi
1275 threaded file putting.
1276 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001277 with tools.Profiler("_map_file for %s" % dst):
1278 with cache.getfileobj(digest) as srcfileobj:
1279 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001280
Takuto Ikuta523c6472019-09-18 02:53:34 +00001281 if filetype == 'basic':
1282 # Ignore all bits apart from the user.
1283 file_mode = (props.get('m') or 0o500) & 0o700
1284 if read_only:
1285 # Enforce read-only if the root bundle does.
1286 file_mode &= 0o500
1287 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001288
Takuto Ikuta523c6472019-09-18 02:53:34 +00001289 elif filetype == 'tar':
1290 basedir = os.path.dirname(dst)
1291 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1292 ensured_dirs = set()
1293 for ti in t:
1294 if not ti.isfile():
1295 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1296 ti.type)
1297 continue
1298 # Handle files created on Windows fetched on POSIX and the
1299 # reverse.
1300 other_sep = '/' if os.path.sep == '\\' else '\\'
1301 name = ti.name.replace(other_sep, os.path.sep)
1302 fp = os.path.normpath(os.path.join(basedir, name))
1303 if not fp.startswith(basedir):
1304 logging.error('Path(%r) is outside root directory', fp)
1305 ifd = t.extractfile(ti)
1306 fp_dir = os.path.dirname(fp)
1307 if fp_dir not in ensured_dirs:
1308 file_path.ensure_tree(fp_dir)
1309 ensured_dirs.add(fp_dir)
1310 file_mode = ti.mode & 0o700
1311 if read_only:
1312 # Enforce read-only if the root bundle does.
1313 file_mode &= 0o500
1314 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001315
Takuto Ikuta523c6472019-09-18 02:53:34 +00001316 else:
1317 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001318
1319
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001320def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1321 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001322 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001323
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001324 Arguments:
1325 isolated_hash: hash of the root *.isolated file.
1326 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001327 cache: ContentAddressedCache class that knows how to store and map files
1328 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001329 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001330 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001331 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001332
1333 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001334 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001335 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001336 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001337 'fetch_isolated(%s, %s, %s, %s, %s)',
1338 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001339 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001340 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001341 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001342 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001343
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001344 with tools.Profiler('GetIsolateds'):
1345 # Optionally support local files by manually adding them to cache.
1346 if not isolated_format.is_valid_hash(isolated_hash, algo):
1347 logging.debug('%s is not a valid hash, assuming a file '
1348 '(algo was %s, hash size was %d)',
1349 isolated_hash, algo(), algo().digest_size)
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001350 path = six.text_type(os.path.abspath(isolated_hash))
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001351 try:
1352 isolated_hash = fetch_queue.inject_local_file(path, algo)
1353 except IOError as e:
1354 raise isolated_format.MappingError(
1355 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1356 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001357
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001358 # Load all *.isolated and start loading rest of the files.
1359 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001360
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001361 with tools.Profiler('GetRest'):
1362 # Create file system hierarchy.
1363 file_path.ensure_tree(outdir)
1364 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001365 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001366
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001367 # Ensure working directory exists.
1368 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1369 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001370
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001371 # Multimap: digest -> list of pairs (path, props).
1372 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001373 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001374 if 'h' in props:
1375 remaining.setdefault(props['h'], []).append((filepath, props))
1376 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001377
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001378 # Now block on the remaining files to be downloaded and mapped.
1379 logging.info('Retrieving remaining files (%d of them)...',
1380 fetch_queue.pending_count)
1381 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001382
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001383 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1384 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1385 while remaining:
1386 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001387
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001388 # Wait for any item to finish fetching to cache.
1389 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001390
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001391 # Create the files in the destination using item in cache as the
1392 # source.
1393 for filepath, props in remaining.pop(digest):
1394 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001395
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001396 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1397 _map_file, fullpath, digest,
1398 props, cache, bundle.read_only,
1399 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001400
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001401 # Report progress.
1402 duration = time.time() - last_update
1403 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1404 msg = '%d files remaining...' % len(remaining)
1405 sys.stdout.write(msg + '\n')
1406 sys.stdout.flush()
1407 logging.info(msg)
1408 last_update = time.time()
1409 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1410 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001411
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001412 # Save the cache right away to not loose the state of the new objects.
1413 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001414 # Cache could evict some items we just tried to fetch, it's a fatal error.
1415 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001416 free_disk = file_path.get_free_space(cache.cache_dir)
1417 msg = (
1418 'Cache is too small to hold all requested files.\n'
1419 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001420 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001421 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001422 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001423
1424
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001425def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001426 """Yields every file and/or symlink found.
1427
1428 Yields:
1429 tuple(FileItem, relpath, metadata)
1430 For a symlink, FileItem is None.
1431 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001432 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001433 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001434 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001435 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001436 root,
1437 u'.' + os.path.sep,
1438 blacklist,
1439 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001440
1441 filepath = os.path.join(root, relpath)
1442 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001443 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001444 meta = isolated_format.file_to_metadata(filepath, 0, False)
1445 yield None, relpath, meta
1446 continue
1447
1448 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001449 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1450 # The file was added to the current pending tarball and won't be archived
1451 # individually.
1452 continue
1453
1454 # Flush and reset the bundle.
1455 for i, p, m in bundle.yield_item_path_meta():
1456 yield i, p, m
1457 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001458
1459 # Yield the file individually.
1460 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1461 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001462
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001463 for i, p, m in bundle.yield_item_path_meta():
1464 yield i, p, m
1465
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001466
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001467def _print_upload_stats(items, missing):
1468 """Prints upload stats."""
1469 total = len(items)
1470 total_size = sum(f.size for f in items)
1471 logging.info(
1472 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1473 cache_hit = set(items).difference(missing)
1474 cache_hit_size = sum(f.size for f in cache_hit)
1475 logging.info(
1476 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1477 len(cache_hit),
1478 cache_hit_size / 1024.,
1479 len(cache_hit) * 100. / total,
1480 cache_hit_size * 100. / total_size if total_size else 0)
1481 cache_miss = missing
1482 cache_miss_size = sum(f.size for f in cache_miss)
1483 logging.info(
1484 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1485 len(cache_miss),
1486 cache_miss_size / 1024.,
1487 len(cache_miss) * 100. / total,
1488 cache_miss_size * 100. / total_size if total_size else 0)
1489
1490
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001491def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001492 """Called by archive_files_to_storage for a directory.
1493
1494 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001495
1496 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001497 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001498 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001499 files = {}
1500 for item, relpath, meta in _directory_to_metadata(
1501 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001502 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001503 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001504 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001505 yield item
1506
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001507 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001508 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001509 'algo': hash_algo_name,
1510 'files': files,
1511 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001512 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001513 # Keep the file in memory. This is fine because .isolated files are relatively
1514 # small.
1515 yield BufferItem(
1516 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001517
1518
Takuto Ikuta26980872020-04-09 06:56:37 +00001519def archive_files_to_storage(storage, files, blacklist, verify_push=False):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001520 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001521
1522 Arguments:
1523 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001524 files: iterable of files to upload. If a directory is specified (with a
1525 trailing slash), a .isolated file is created and its hash is returned.
1526 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001527 blacklist: function that returns True if a file should be omitted.
Takuto Ikuta26980872020-04-09 06:56:37 +00001528 verify_push: verify files are uploaded correctly by fetching from server.
maruel064c0a32016-04-05 11:47:15 -07001529
1530 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001531 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001532 The first file in the first item is always the .isolated file.
Ye Kuang4e994292020-04-10 07:07:35 +00001533
1534 Raises:
1535 Re-raises the exception in upload_items(), if there is any.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001536 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001537 # Dict of path to hash.
1538 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001539 hash_algo = storage.server_ref.hash_algo
1540 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001541 # Generator of FileItem to pass to upload_items() concurrent operation.
1542 channel = threading_utils.TaskChannel()
Ye Kuang4e994292020-04-10 07:07:35 +00001543 exc_channel = threading_utils.TaskChannel()
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001544 uploaded_digests = set()
Ye Kuang4e994292020-04-10 07:07:35 +00001545
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001546 def _upload_items():
Ye Kuang4e994292020-04-10 07:07:35 +00001547 try:
1548 results = storage.upload_items(channel, verify_push)
1549 uploaded_digests.update(f.digest for f in results)
1550 except Exception:
1551 exc_channel.send_exception()
1552
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001553 t = threading.Thread(target=_upload_items)
1554 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001555
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001556 # Keep track locally of the items to determine cold and hot items.
1557 items_found = []
1558 try:
1559 for f in files:
Takuto Ikuta95459dd2019-10-29 12:39:47 +00001560 assert isinstance(f, six.text_type), repr(f)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001561 if f in results:
1562 # Duplicate
1563 continue
1564 try:
1565 filepath = os.path.abspath(f)
1566 if fs.isdir(filepath):
1567 # Uploading a whole directory.
1568 item = None
1569 for item in _enqueue_dir(
1570 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001571 channel.send_result(item)
1572 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001573 # The very last item will be the .isolated file.
1574 if not item:
1575 # There was no file in the directory.
1576 continue
1577 elif fs.isfile(filepath):
1578 item = FileItem(
1579 path=filepath,
1580 algo=hash_algo,
1581 size=None,
1582 high_priority=f.endswith('.isolated'))
1583 channel.send_result(item)
1584 items_found.append(item)
1585 else:
1586 raise Error('%s is neither a file or directory.' % f)
1587 results[f] = item.digest
1588 except OSError:
1589 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001590 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001591 # Stops the generator, so _upload_items() can exit.
1592 channel.send_done()
1593 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +00001594 exc_channel.send_done()
1595 for _ in exc_channel:
1596 pass
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001597
1598 cold = []
1599 hot = []
1600 for i in items_found:
1601 # Note that multiple FileItem may have the same .digest.
1602 if i.digest in uploaded_digests:
1603 cold.append(i)
1604 else:
1605 hot.append(i)
1606 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001607
1608
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001609@subcommand.usage('<file1..fileN> or - to read from stdin')
1610def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001611 """Archives data to the server.
1612
1613 If a directory is specified, a .isolated file is created the whole directory
1614 is uploaded. Then this .isolated file can be included in another one to run
1615 commands.
1616
1617 The commands output each file that was processed with its content hash. For
1618 directories, the .isolated generated for the directory is listed as the
1619 directory entry itself.
1620 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001621 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001622 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001623 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001624 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001625 server_ref = isolate_storage.ServerRef(
1626 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001627 if files == ['-']:
1628 files = (l.rstrip('\n\r') for l in sys.stdin)
1629 if not files:
1630 parser.error('Nothing to upload')
1631 files = (f.decode('utf-8') for f in files)
1632 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001633 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001634 with get_storage(server_ref) as storage:
1635 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001636 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001637 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001638 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001639 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001640
1641
1642def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001643 """Download data from the server.
1644
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001645 It can either download individual files or a complete tree from a .isolated
1646 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001647 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001648 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001649 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001650 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001651 help='hash of an isolated file, .isolated file content is discarded, use '
1652 '--file if you need it')
1653 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001654 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1655 help='hash and destination of a file, can be used multiple times')
1656 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001657 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001658 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001659 parser.add_option(
1660 '--use-symlinks', action='store_true',
1661 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001662 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001663 options, args = parser.parse_args(args)
1664 if args:
1665 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001666 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001667 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001668
nodir55be77b2016-05-03 09:39:57 -07001669 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001670 if bool(options.isolated) == bool(options.file):
1671 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001672 if not options.cache and options.use_symlinks:
1673 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001674
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001675 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001676 cache.cleanup()
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001677 options.target = six.text_type(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001678 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001679 if (fs.isfile(options.target) or
1680 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001681 parser.error(
1682 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001683 server_ref = isolate_storage.ServerRef(
1684 options.isolate_server, options.namespace)
1685 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001686 # Fetching individual files.
1687 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001688 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001689 channel = threading_utils.TaskChannel()
1690 pending = {}
1691 for digest, dest in options.file:
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001692 dest = six.text_type(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001693 pending[digest] = dest
1694 storage.async_fetch(
1695 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001696 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001697 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001698 local_caching.UNKNOWN_FILE_SIZE,
1699 functools.partial(
1700 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001701 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001702 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001703 dest = pending.pop(fetched)
1704 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001705
Vadim Shtayura3172be52013-12-03 12:49:05 -08001706 # Fetching whole isolated tree.
1707 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001708 bundle = fetch_isolated(
1709 isolated_hash=options.isolated,
1710 storage=storage,
1711 cache=cache,
1712 outdir=options.target,
1713 use_symlinks=options.use_symlinks)
1714 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001715 if bundle.command:
1716 rel = os.path.join(options.target, bundle.relative_cwd)
1717 print('To run this test please run from the directory %s:' %
1718 os.path.join(options.target, rel))
1719 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001720
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001721 return 0
1722
1723
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001724def add_archive_options(parser):
1725 parser.add_option(
1726 '--blacklist',
1727 action='append', default=list(DEFAULT_BLACKLIST),
1728 help='List of regexp to use as blacklist filter when uploading '
1729 'directories')
1730
1731
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001732def add_isolate_server_options(parser):
1733 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001734 parser.add_option(
1735 '-I', '--isolate-server',
1736 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001737 help='URL of the Isolate Server to use. Defaults to the environment '
1738 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1739 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001740 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001741 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001742 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001743 '--namespace', default='default-gzip',
1744 help='The namespace to use on the Isolate Server, default: %default')
1745
1746
nodir55be77b2016-05-03 09:39:57 -07001747def process_isolate_server_options(
1748 parser, options, set_exception_handler, required):
1749 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001750
1751 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001752 """
1753 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001754 if required:
1755 parser.error('--isolate-server is required.')
1756 return
1757
aludwind7b7b7e2017-06-29 16:38:50 -07001758 if options.grpc_proxy:
1759 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001760 else:
1761 try:
1762 options.isolate_server = net.fix_url(options.isolate_server)
1763 except ValueError as e:
1764 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001765 if set_exception_handler:
1766 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001767 try:
1768 return auth.ensure_logged_in(options.isolate_server)
1769 except ValueError as e:
1770 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001771 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001772
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001773
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001774def add_cache_options(parser):
1775 cache_group = optparse.OptionGroup(parser, 'Cache management')
1776 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001777 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001778 help='Directory to keep a local cache of the files. Accelerates download '
1779 'by reusing already downloaded files. Default=%default')
1780 cache_group.add_option(
1781 '--max-cache-size',
1782 type='int',
1783 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001784 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001785 help='Trim if the cache gets larger than this value, default=%default')
1786 cache_group.add_option(
1787 '--min-free-space',
1788 type='int',
1789 metavar='NNN',
1790 default=2*1024*1024*1024,
1791 help='Trim if disk free space becomes lower than this value, '
1792 'default=%default')
1793 cache_group.add_option(
1794 '--max-items',
1795 type='int',
1796 metavar='NNN',
1797 default=100000,
1798 help='Trim if more than this number of items are in the cache '
1799 'default=%default')
1800 parser.add_option_group(cache_group)
1801
1802
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001803def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001804 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001805 policies = local_caching.CachePolicies(
1806 options.max_cache_size,
1807 options.min_free_space,
1808 options.max_items,
1809 # 3 weeks.
1810 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001811
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001812 # |options.cache| path may not exist until DiskContentAddressedCache()
1813 # instance is created.
1814 return local_caching.DiskContentAddressedCache(
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001815 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001816 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001817
1818
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001819class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001820 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001821 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001822 self,
1823 version=__version__,
1824 prog=os.path.basename(sys.modules[__name__].__file__),
1825 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001826 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001827
1828 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001829 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001830 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001831 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001832 return options, args
1833
1834
1835def main(args):
1836 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001837 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001838
1839
1840if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001841 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001842 fix_encoding.fix_encoding()
1843 tools.disable_buffering()
1844 colorama.init()
Takuto Ikuta160b4452020-04-15 06:33:55 +00001845 net.set_user_agent('isolateserver.py/' + __version__)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001846 sys.exit(main(sys.argv[1:]))