blob: 486d76daea1615340de99abf661198839217dbce [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Takuto Ikuta160b4452020-04-15 06:33:55 +000025from utils import net
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
Takuto Ikuta6e2ff962019-10-29 12:35:27 +000033import six
Lei Leife202df2019-06-11 17:33:34 +000034from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000035
36# pylint: disable=ungrouped-imports
37import auth
38import isolated_format
39import isolate_storage
40import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050041from utils import file_path
maruel12e30012015-10-09 11:55:35 -070042from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040043from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000044from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040045from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070046from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000047from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080048
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000049
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000050# Version of isolate protocol passed to the server in /handshake request.
51ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000052
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053
Vadim Shtayura3148e072014-09-02 18:51:52 -070054# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
tansell9e04a8d2016-07-28 09:31:59 -0700122def fileobj_path(fileobj):
123 """Return file system path for file like object or None.
124
125 The returned path is guaranteed to exist and can be passed to file system
126 operations like copy.
127 """
128 name = getattr(fileobj, 'name', None)
129 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000130 return None
tansell9e04a8d2016-07-28 09:31:59 -0700131
132 # If the file like object was created using something like open("test.txt")
133 # name will end up being a str (such as a function outside our control, like
134 # the standard library). We want all our paths to be unicode objects, so we
135 # decode it.
Takuto Ikuta95459dd2019-10-29 12:39:47 +0000136 if not isinstance(name, six.text_type):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500137 # We incorrectly assume that UTF-8 is used everywhere.
138 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700139
tansell26de79e2016-11-13 18:41:11 -0800140 # fs.exists requires an absolute path, otherwise it will fail with an
141 # assertion error.
142 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000143 return None
tansell26de79e2016-11-13 18:41:11 -0800144
tansell9e04a8d2016-07-28 09:31:59 -0700145 if fs.exists(name):
146 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000147 return None
tansell9e04a8d2016-07-28 09:31:59 -0700148
149
150# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
151# wrappers have been created.
152def fileobj_copy(
153 dstfileobj, srcfileobj, size=-1,
154 chunk_size=isolated_format.DISK_FILE_CHUNK):
155 """Copy data from srcfileobj to dstfileobj.
156
157 Providing size means exactly that amount of data will be copied (if there
158 isn't enough data, an IOError exception is thrown). Otherwise all data until
159 the EOF marker will be copied.
160 """
161 if size == -1 and hasattr(srcfileobj, 'tell'):
162 if srcfileobj.tell() != 0:
163 raise IOError('partial file but not using size')
164
165 written = 0
166 while written != size:
167 readsize = chunk_size
168 if size > 0:
169 readsize = min(readsize, size-written)
170 data = srcfileobj.read(readsize)
171 if not data:
172 if size == -1:
173 break
174 raise IOError('partial file, got %s, wanted %s' % (written, size))
175 dstfileobj.write(data)
176 written += len(data)
177
178
179def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
180 """Put srcfileobj at the given dstpath with given mode.
181
182 The function aims to do this as efficiently as possible while still allowing
183 any possible file like object be given.
184
185 Creating a tree of hardlinks has a few drawbacks:
186 - tmpfs cannot be used for the scratch space. The tree has to be on the same
187 partition as the cache.
188 - involves a write to the inode, which advances ctime, cause a metadata
189 writeback (causing disk seeking).
190 - cache ctime cannot be used to detect modifications / corruption.
191 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
192 partition. This is why the function automatically fallbacks to copying the
193 file content.
194 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
195 same owner is for all hardlinks.
196 - Anecdotal report that ext2 is known to be potentially faulty on high rate
197 of hardlink creation.
198
199 Creating a tree of symlinks has a few drawbacks:
200 - Tasks running the equivalent of os.path.realpath() will get the naked path
201 and may fail.
202 - Windows:
203 - Symlinks are reparse points:
204 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
205 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
206 - Symbolic links are Win32 paths, not NT paths.
207 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
208 - Symbolic links are supported on Windows 7 and later only.
209 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
210 default.
211 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
212 RID is present in the token;
213 https://msdn.microsoft.com/en-us/library/bb530410.aspx
214 """
215 srcpath = fileobj_path(srcfileobj)
216 if srcpath and size == -1:
217 readonly = file_mode is None or (
218 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
219
220 if readonly:
221 # If the file is read only we can link the file
222 if use_symlink:
223 link_mode = file_path.SYMLINK_WITH_FALLBACK
224 else:
225 link_mode = file_path.HARDLINK_WITH_FALLBACK
226 else:
227 # If not read only, we must copy the file
228 link_mode = file_path.COPY
229
230 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000231 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700232 else:
233 # Need to write out the file
234 with fs.open(dstpath, 'wb') as dstfileobj:
235 fileobj_copy(dstfileobj, srcfileobj, size)
236
Takuto Ikuta523c6472019-09-18 02:53:34 +0000237 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
238 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
239 # early to skip slow/unnecessary chmod call.
240 return
tansell9e04a8d2016-07-28 09:31:59 -0700241
242 # file_mode of 0 is actually valid, so need explicit check.
243 if file_mode is not None:
244 fs.chmod(dstpath, file_mode)
245
246
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000247def zip_compress(content_generator, level=7):
248 """Reads chunks from |content_generator| and yields zip compressed chunks."""
249 compressor = zlib.compressobj(level)
250 for chunk in content_generator:
251 compressed = compressor.compress(chunk)
252 if compressed:
253 yield compressed
254 tail = compressor.flush(zlib.Z_FINISH)
255 if tail:
256 yield tail
257
258
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400259def zip_decompress(
260 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """Reads zipped data from |content_generator| and yields decompressed data.
262
263 Decompresses data in small chunks (no larger than |chunk_size|) so that
264 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
265
266 Raises IOError if data is corrupted or incomplete.
267 """
268 decompressor = zlib.decompressobj()
269 compressed_size = 0
270 try:
271 for chunk in content_generator:
272 compressed_size += len(chunk)
273 data = decompressor.decompress(chunk, chunk_size)
274 if data:
275 yield data
276 while decompressor.unconsumed_tail:
277 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
278 if data:
279 yield data
280 tail = decompressor.flush()
281 if tail:
282 yield tail
283 except zlib.error as e:
284 raise IOError(
285 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
286 # Ensure all data was read and decompressed.
287 if decompressor.unused_data or decompressor.unconsumed_tail:
288 raise IOError('Not all data was decompressed')
289
290
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000291def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000292 """Given a filename calculates the ideal zip compression level to use."""
293 file_ext = os.path.splitext(filename)[1].lower()
294 # TODO(csharp): Profile to find what compression level works best.
295 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
296
297
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000298def create_directories(base_directory, files):
299 """Creates the directory structure needed by the given list of files."""
300 logging.debug('create_directories(%s, %d)', base_directory, len(files))
301 # Creates the tree of directories to create.
302 directories = set(os.path.dirname(f) for f in files)
303 for item in list(directories):
304 while item:
305 directories.add(item)
306 item = os.path.dirname(item)
307 for d in sorted(directories):
308 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700309 abs_d = os.path.join(base_directory, d)
310 if not fs.isdir(abs_d):
311 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000312
313
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000314def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500315 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000316 for filepath, properties in files:
317 if 'l' not in properties:
318 continue
319 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500320 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000321 logging.warning('Ignoring symlink %s', filepath)
322 continue
323 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700324 try:
325 os.symlink(properties['l'], outfile) # pylint: disable=E1101
326 except OSError as e:
327 if e.errno == errno.EEXIST:
328 raise AlreadyExists('File %s already exists.' % outfile)
329 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000330
331
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000332class _ThreadFile(object):
333 """Multithreaded fake file. Used by TarBundle."""
334 def __init__(self):
335 self._data = threading_utils.TaskChannel()
336 self._offset = 0
337
338 def __iter__(self):
339 return self._data
340
341 def tell(self):
342 return self._offset
343
344 def write(self, b):
345 self._data.send_result(b)
346 self._offset += len(b)
347
348 def close(self):
349 self._data.send_done()
350
351
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400352class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800353 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000354
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800355 Its digest and size may be provided in advance, if known. Otherwise they will
356 be derived from the file content.
357 """
358
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000359 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800360 super(FileItem, self).__init__(
361 digest,
maruel12e30012015-10-09 11:55:35 -0700362 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000363 high_priority,
364 compression_level=_get_zip_compression_level(path))
365 self._path = path
366 self._algo = algo
367 self._meta = None
368
369 @property
370 def path(self):
371 return self._path
372
373 @property
374 def digest(self):
375 if not self._digest:
376 self._digest = isolated_format.hash_file(self._path, self._algo)
377 return self._digest
378
379 @property
380 def meta(self):
381 if not self._meta:
382 # TODO(maruel): Inline.
383 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
384 # We need to hash right away.
385 self._meta['h'] = self.digest
386 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000387
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800388 def content(self):
389 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000390
391
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000392class TarBundle(isolate_storage.Item):
393 """Tarfile to push to Storage.
394
395 Its digest is the digest of all the files it contains. It is generated on the
396 fly.
397 """
398
399 def __init__(self, root, algo):
400 # 2 trailing 512 bytes headers.
401 super(TarBundle, self).__init__(size=1024)
402 self._items = []
403 self._meta = None
404 self._algo = algo
405 self._root_len = len(root) + 1
406 # Same value as for Go.
407 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
408 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
409 self._archive_max_size = int(10e6)
410
411 @property
412 def digest(self):
413 if not self._digest:
414 self._prepare()
415 return self._digest
416
417 @property
418 def size(self):
419 if self._size is None:
420 self._prepare()
421 return self._size
422
423 def try_add(self, item):
424 """Try to add this file to the bundle.
425
426 It is extremely naive but this should be just enough for
427 https://crbug.com/825418.
428
429 Future improvements should be in the Go code, and the Swarming bot should be
430 migrated to use the Go code instead.
431 """
432 if not item.size:
433 return False
434 # pylint: disable=unreachable
435 rounded = (item.size + 512) & ~511
436 if rounded + self._size > self._archive_max_size:
437 return False
438 # https://crbug.com/825418
439 return False
440 self._size += rounded
441 self._items.append(item)
442 return True
443
444 def yield_item_path_meta(self):
445 """Returns a tuple(Item, filepath, meta_dict).
446
447 If the bundle contains less than 5 items, the items are yielded.
448 """
449 if len(self._items) < 5:
450 # The tarball is too small, yield individual items, if any.
451 for item in self._items:
452 yield item, item.path[self._root_len:], item.meta
453 else:
454 # This ensures self._meta is set.
455 p = self.digest + '.tar'
456 # Yield itself as a tarball.
457 yield self, p, self._meta
458
459 def content(self):
460 """Generates the tarfile content on the fly."""
461 obj = _ThreadFile()
462 def _tar_thread():
463 try:
464 t = tarfile.open(
465 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
466 for item in self._items:
467 logging.info(' tarring %s', item.path)
468 t.add(item.path)
469 t.close()
470 except Exception:
471 logging.exception('Internal failure')
472 finally:
473 obj.close()
474
475 t = threading.Thread(target=_tar_thread)
476 t.start()
477 try:
478 for data in obj:
479 yield data
480 finally:
481 t.join()
482
483 def _prepare(self):
484 h = self._algo()
485 total = 0
486 for chunk in self.content():
487 h.update(chunk)
488 total += len(chunk)
489 # pylint: disable=attribute-defined-outside-init
490 # This is not true, they are defined in Item.__init__().
491 self._digest = h.hexdigest()
492 self._size = total
493 self._meta = {
494 'h': self.digest,
495 's': self.size,
496 't': u'tar',
497 }
498
499
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400500class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000501 """A byte buffer to push to Storage."""
502
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000503 def __init__(self, buf, algo, high_priority=False):
504 super(BufferItem, self).__init__(
505 digest=algo(buf).hexdigest(),
506 size=len(buf),
507 high_priority=high_priority)
508 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000511 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000512
513
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000514class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000516
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800517 Implements compression support, parallel 'contains' checks, parallel uploads
518 and more.
519
520 Works only within single namespace (and thus hashing algorithm and compression
521 scheme are fixed).
522
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400523 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
524 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800525 """
526
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700527 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000528 self._storage_api = storage_api
529 self._cpu_thread_pool = None
530 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400531 self._aborted = False
532 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000533
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000535 def server_ref(self):
536 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000538 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700539 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000540 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700541
542 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000543 def cpu_thread_pool(self):
544 """ThreadPool for CPU-bound tasks like zipping."""
545 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500546 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000547 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
548 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500549 # On 32 bits userland, do not try to use more than 16 threads.
550 threads = min(threads, 16)
551 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000553
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 @property
555 def net_thread_pool(self):
556 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
557 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700558 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000560
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000561 def close(self):
562 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400563 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000564 if self._cpu_thread_pool:
565 self._cpu_thread_pool.join()
566 self._cpu_thread_pool.close()
567 self._cpu_thread_pool = None
568 if self._net_thread_pool:
569 self._net_thread_pool.join()
570 self._net_thread_pool.close()
571 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400572 logging.info('Done.')
573
574 def abort(self):
575 """Cancels any pending or future operations."""
576 # This is not strictly theadsafe, but in the worst case the logging message
577 # will be printed twice. Not a big deal. In other places it is assumed that
578 # unprotected reads and writes to _aborted are serializable (it is true
579 # for python) and thus no locking is used.
580 if not self._aborted:
581 logging.warning('Aborting... It can take a while.')
582 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000583
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000584 def __enter__(self):
585 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400586 assert not self._prev_sig_handlers, self._prev_sig_handlers
587 for s in (signal.SIGINT, signal.SIGTERM):
588 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000589 return self
590
591 def __exit__(self, _exc_type, _exc_value, _traceback):
592 """Context manager interface."""
593 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400594 while self._prev_sig_handlers:
595 s, h = self._prev_sig_handlers.popitem()
596 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000597 return False
598
Takuto Ikuta26980872020-04-09 06:56:37 +0000599 def upload_items(self, items, verify_push=False):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000600 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000601
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800602 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000603
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000604 It uses 3 threads internally:
605 - One to create batches based on a timeout
606 - One to dispatch the /contains RPC and field the missing entries
607 - One to field the /push RPC
608
609 The main threads enumerates 'items' and pushes to the first thread. Then it
610 join() all the threads, waiting for them to complete.
611
612 (enumerate items of Item, this can be slow as disk is traversed)
613 |
614 v
615 _create_items_batches_thread Thread #1
616 (generates list(Item), every 3s or 20~100 items)
617 |
618 v
619 _do_lookups_thread Thread #2
620 | |
621 v v
622 (missing) (was on server)
623 |
624 v
625 _handle_missing_thread Thread #3
626 |
627 v
628 (upload Item, append to uploaded)
629
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000630 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400631 items: list of isolate_storage.Item instances that represents data to
632 upload.
Takuto Ikuta26980872020-04-09 06:56:37 +0000633 verify_push: verify files are uploaded correctly by fetching from server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000634
635 Returns:
636 List of items that were uploaded. All other items are already there.
Ye Kuang4e994292020-04-10 07:07:35 +0000637
638 Raises:
639 The first exception being raised in the worker threads.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000640 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000641 incoming = Queue.Queue()
642 batches_to_lookup = Queue.Queue()
643 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000644 uploaded = []
Ye Kuang4e994292020-04-10 07:07:35 +0000645 exc_channel = threading_utils.TaskChannel()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800646
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000647 def _create_items_batches_thread():
648 """Creates batches for /contains RPC lookup from individual items.
649
650 Input: incoming
651 Output: batches_to_lookup
652 """
653 try:
654 batch_size_index = 0
655 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
656 batch = []
657 while not self._aborted:
658 try:
659 item = incoming.get(True, timeout=3)
660 if item:
661 batch.append(item)
662 except Queue.Empty:
663 item = False
664 if len(batch) == batch_size or (not item and batch):
665 if len(batch) == batch_size:
666 batch_size_index += 1
667 batch_size = ITEMS_PER_CONTAINS_QUERIES[
668 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
669 batches_to_lookup.put(batch)
670 batch = []
671 if item is None:
672 break
Ye Kuang4e994292020-04-10 07:07:35 +0000673 except Exception:
674 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000675 finally:
676 # Unblock the next pipeline.
677 batches_to_lookup.put(None)
678
679 def _do_lookups_thread():
680 """Enqueues all the /contains RPCs and emits the missing items.
681
682 Input: batches_to_lookup
683 Output: missing, to_upload
684 """
685 try:
686 channel = threading_utils.TaskChannel()
687 def _contains(b):
688 if self._aborted:
689 raise Aborted()
690 return self._storage_api.contains(b)
691
692 pending_contains = 0
693 while not self._aborted:
694 batch = batches_to_lookup.get()
695 if batch is None:
696 break
697 self.net_thread_pool.add_task_with_channel(
698 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
699 pending_contains += 1
700 while pending_contains and not self._aborted:
701 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000702 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000703 except threading_utils.TaskChannel.Timeout:
704 break
705 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000706 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000707 missing.put((missing_item, push_state))
708 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000709 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000710 missing.put((missing_item, push_state))
711 pending_contains -= 1
Ye Kuang4e994292020-04-10 07:07:35 +0000712 except Exception:
713 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000714 finally:
715 # Unblock the next pipeline.
716 missing.put((None, None))
717
718 def _handle_missing_thread():
719 """Sends the missing items to the uploader.
720
721 Input: missing
722 Output: uploaded
723 """
Ye Kuang4e994292020-04-10 07:07:35 +0000724 try:
725 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
726 channel = threading_utils.TaskChannel()
727 pending_upload = 0
728 while not self._aborted:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000729 try:
Ye Kuang4e994292020-04-10 07:07:35 +0000730 missing_item, push_state = missing.get(True, timeout=5)
731 if missing_item is None:
732 break
733 self._async_push(channel, missing_item, push_state, verify_push)
734 pending_upload += 1
735 except Queue.Empty:
736 pass
737 detector.ping()
738 while not self._aborted and pending_upload:
739 try:
740 item = channel.next(timeout=0)
741 except threading_utils.TaskChannel.Timeout:
742 break
743 uploaded.append(item)
744 pending_upload -= 1
745 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
746 pending_upload, item.digest, item.size)
747 while not self._aborted and pending_upload:
748 item = channel.next()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000749 uploaded.append(item)
750 pending_upload -= 1
751 logging.debug(
752 'Uploaded %d; %d pending: %s (%d)',
753 len(uploaded), pending_upload, item.digest, item.size)
Ye Kuang4e994292020-04-10 07:07:35 +0000754 except Exception:
755 exc_channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000756
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000757 threads = [
758 threading.Thread(target=_create_items_batches_thread),
759 threading.Thread(target=_do_lookups_thread),
760 threading.Thread(target=_handle_missing_thread),
761 ]
762 for t in threads:
763 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000764
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000765 try:
766 # For each digest keep only first isolate_storage.Item that matches it.
767 # All other items are just indistinguishable copies from the point of view
768 # of isolate server (it doesn't care about paths at all, only content and
769 # digests).
770 seen = {}
771 try:
772 # TODO(maruel): Reorder the items as a priority queue, with larger items
773 # being processed first. This is, before hashing the data.
774 # This must be done in the primary thread since items can be a
775 # generator.
776 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000777 if seen.setdefault(item.digest, item) is item:
778 incoming.put(item)
779 finally:
780 incoming.put(None)
781 finally:
782 for t in threads:
783 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +0000784 exc_channel.send_done()
785 for _ in exc_channel:
786 # If there is no exception, this loop does nothing. Otherwise, it raises
787 # the first exception put onto |exc_channel|.
788 pass
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000789
790 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000791 if seen:
792 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000793 return uploaded
794
Takuto Ikuta26980872020-04-09 06:56:37 +0000795 def _async_push(self, channel, item, push_state, verify_push=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000796 """Starts asynchronous push to the server in a parallel thread.
797
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000798 Can be used only after |item| was checked for presence on a server with a
799 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800800
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000801 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000802 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400803 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000804 push_state: push state returned by storage_api.contains(). It contains
805 storage specific information describing how to upload the item (for
806 example in case of cloud storage, it is signed upload URLs).
Takuto Ikuta26980872020-04-09 06:56:37 +0000807 verify_push: verify files are uploaded correctly by fetching from server.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800808
809 Returns:
810 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000811 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800812 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400813 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700814 threading_utils.PRIORITY_HIGH if item.high_priority
815 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800816
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000817 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400818 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400819 if self._aborted:
820 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800821 self._storage_api.push(item, push_state, content)
Takuto Ikuta26980872020-04-09 06:56:37 +0000822 if verify_push:
823 self._fetch(
824 item.digest,
825 item.size,
826 # this consumes all elements from given generator.
827 lambda gen: collections.deque(gen, maxlen=0))
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000828 return item
829
Wei Huang1a38fbe2017-11-28 22:55:22 -0500830 # If zipping is not required, just start a push task. Don't pass 'content'
831 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000832 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000833 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000834 return
835
836 # If zipping is enabled, zip in a separate thread.
837 def zip_and_push():
838 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
839 # content right here. It will block until all file is zipped.
840 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400841 if self._aborted:
842 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800843 stream = zip_compress(item.content(), item.compression_level)
Lei Lei73a5f732020-03-23 20:36:14 +0000844 # In Python3, zlib.compress returns a byte object instead of str.
845 data = six.b('').join(stream)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000846 except Exception as exc:
847 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800848 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000849 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500850 # Pass '[data]' explicitly because the compressed data is not same as the
851 # one provided by 'item'. Since '[data]' is a list, it can safely be
852 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000853 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000854 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000855 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000856
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800857 def push(self, item, push_state):
858 """Synchronously pushes a single item to the server.
859
860 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000861 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800862
863 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400864 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000865 push_state: push state returned by storage_api.contains(). It contains
866 storage specific information describing how to upload the item (for
867 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800868
869 Returns:
870 Pushed item (same object as |item|).
871 """
872 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700873 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000874 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000875 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800876 assert pushed is item
877 return item
878
Takuto Ikuta26980872020-04-09 06:56:37 +0000879 def _fetch(self, digest, size, sink):
880 try:
881 # Prepare reading pipeline.
882 stream = self._storage_api.fetch(digest, size, 0)
883 if self.server_ref.is_with_compression:
884 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
885 # Run |stream| through verifier that will assert its size.
886 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
887 size)
888 # Verified stream goes to |sink|.
889 sink(verifier.run())
Takuto Ikuta98dcc372020-04-20 09:09:28 +0000890 except Exception:
891 logging.exception('Failed to fetch %s', digest)
Takuto Ikuta26980872020-04-09 06:56:37 +0000892 raise
893
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000894 def async_fetch(self, channel, priority, digest, size, sink):
895 """Starts asynchronous fetch from the server in a parallel thread.
896
897 Arguments:
898 channel: TaskChannel that receives back |digest| when download ends.
899 priority: thread pool task priority for the fetch.
900 digest: hex digest of an item to download.
901 size: expected size of the item (after decompression).
902 sink: function that will be called as sink(generator).
903 """
904 def fetch():
Takuto Ikuta26980872020-04-09 06:56:37 +0000905 self._fetch(digest, size, sink)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000906 return digest
907
908 # Don't bother with zip_thread_pool for decompression. Decompression is
909 # really fast and most probably IO bound anyway.
910 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
911
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000912
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000913class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400914 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000915
916 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400917 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
918 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000919 """
920
921 def __init__(self, storage, cache):
922 self.storage = storage
923 self.cache = cache
924 self._channel = threading_utils.TaskChannel()
925 self._pending = set()
926 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000927 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400928 # Pending digests that the caller waits for, see wait_on()/wait().
929 self._waiting_on = set()
930 # Already fetched digests the caller waits for which are not yet returned by
931 # wait().
932 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000933
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400934 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700935 self,
936 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400937 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700938 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000939 """Starts asynchronous fetch of item |digest|."""
940 # Fetching it now?
941 if digest in self._pending:
942 return
943
944 # Mark this file as in use, verify_all_cached will later ensure it is still
945 # in cache.
946 self._accessed.add(digest)
947
948 # Already fetched? Notify cache to update item's LRU position.
949 if digest in self._fetched:
950 # 'touch' returns True if item is in cache and not corrupted.
951 if self.cache.touch(digest, size):
952 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000953 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000954 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000955
956 # TODO(maruel): It should look at the free disk space, the current cache
957 # size and the size of the new item on every new item:
958 # - Trim the cache as more entries are listed when free disk space is low,
959 # otherwise if the amount of data downloaded during the run > free disk
960 # space, it'll crash.
961 # - Make sure there's enough free disk space to fit all dependencies of
962 # this run! If not, abort early.
963
964 # Start fetching.
965 self._pending.add(digest)
966 self.storage.async_fetch(
967 self._channel, priority, digest, size,
968 functools.partial(self.cache.write, digest))
969
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400970 def wait_on(self, digest):
971 """Updates digests to be waited on by 'wait'."""
972 # Calculate once the already fetched items. These will be retrieved first.
973 if digest in self._fetched:
974 self._waiting_on_ready.add(digest)
975 else:
976 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000977
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400978 def wait(self):
979 """Waits until any of waited-on items is retrieved.
980
981 Once this happens, it is remove from the waited-on set and returned.
982
983 This function is called in two waves. The first wave it is done for HIGH
984 priority items, the isolated files themselves. The second wave it is called
985 for all the files.
986
987 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000988 """
989 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400990 if self._waiting_on_ready:
991 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000992
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400993 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000994
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400995 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000996 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000997 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000998 self._pending.remove(digest)
999 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001000 if digest in self._waiting_on:
1001 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001002 return digest
1003
1004 # Should never reach this point due to assert above.
1005 raise RuntimeError('Impossible state')
1006
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001007 @property
1008 def wait_queue_empty(self):
1009 """Returns True if there is no digest left for wait() to return."""
1010 return not self._waiting_on and not self._waiting_on_ready
1011
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001012 def inject_local_file(self, path, algo):
1013 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -07001014 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001015 data = f.read()
1016 digest = algo(data).hexdigest()
1017 self.cache.write(digest, [data])
1018 self._fetched.add(digest)
1019 return digest
1020
1021 @property
1022 def pending_count(self):
1023 """Returns number of items to be fetched."""
1024 return len(self._pending)
1025
1026 def verify_all_cached(self):
1027 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001028 # Not thread safe, but called after all work is done.
1029 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001030
1031
1032class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001033 """Verifies that fetched file is valid before passing it to the
1034 ContentAddressedCache.
1035 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001036
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001037 def __init__(self, stream, hasher, expected_digest, expected_size):
1038 """Initializes the verifier.
1039
1040 Arguments:
1041 * stream: an iterable yielding chunks of content
1042 * hasher: an object from hashlib that supports update() and hexdigest()
1043 (eg, hashlib.sha1).
1044 * expected_digest: if the entire stream is piped through hasher and then
1045 summarized via hexdigest(), this should be the result. That is, it
1046 should be a hex string like 'abc123'.
1047 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001048 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001049 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001050 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001051 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001052 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001053 self.expected_size = expected_size
1054 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001055 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001056
1057 def run(self):
1058 """Generator that yields same items as |stream|.
1059
1060 Verifies |stream| is complete before yielding a last chunk to consumer.
1061
1062 Also wraps IOError produced by consumer into MappingError exceptions since
1063 otherwise Storage will retry fetch on unrelated local cache errors.
1064 """
1065 # Read one chunk ahead, keep it in |stored|.
1066 # That way a complete stream can be verified before pushing last chunk
1067 # to consumer.
1068 stored = None
1069 for chunk in self.stream:
1070 assert chunk is not None
1071 if stored is not None:
1072 self._inspect_chunk(stored, is_last=False)
1073 try:
1074 yield stored
1075 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001076 raise isolated_format.MappingError(
1077 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001078 stored = chunk
1079 if stored is not None:
1080 self._inspect_chunk(stored, is_last=True)
1081 try:
1082 yield stored
1083 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001084 raise isolated_format.MappingError(
1085 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001086
1087 def _inspect_chunk(self, chunk, is_last):
1088 """Called for each fetched chunk before passing it to consumer."""
1089 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001090 self.rolling_hash.update(chunk)
1091 if not is_last:
1092 return
1093
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001094 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001095 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001096 msg = 'Incorrect file size: want %d, got %d' % (
1097 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001098 raise IOError(msg)
1099
1100 actual_digest = self.rolling_hash.hexdigest()
1101 if self.expected_digest != actual_digest:
1102 msg = 'Incorrect digest: want %s, got %s' % (
1103 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001104 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001105
1106
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001107class IsolatedBundle(object):
1108 """Fetched and parsed .isolated file with all dependencies."""
1109
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001110 def __init__(self, filter_cb):
1111 """
1112 filter_cb: callback function to filter downloaded content.
1113 When filter_cb is not None, Isolated file is downloaded iff
1114 filter_cb(filepath) returns True.
1115 """
1116
Vadim Shtayura3148e072014-09-02 18:51:52 -07001117 self.command = []
1118 self.files = {}
1119 self.read_only = None
1120 self.relative_cwd = None
1121 # The main .isolated file, a IsolatedFile instance.
1122 self.root = None
1123
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001124 self._filter_cb = filter_cb
1125
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001126 def fetch(self, fetch_queue, root_isolated_hash, algo):
1127 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001128
1129 It enables support for "included" .isolated files. They are processed in
1130 strict order but fetched asynchronously from the cache. This is important so
1131 that a file in an included .isolated file that is overridden by an embedding
1132 .isolated file is not fetched needlessly. The includes are fetched in one
1133 pass and the files are fetched as soon as all the ones on the left-side
1134 of the tree were fetched.
1135
1136 The prioritization is very important here for nested .isolated files.
1137 'includes' have the highest priority and the algorithm is optimized for both
1138 deep and wide trees. A deep one is a long link of .isolated files referenced
1139 one at a time by one item in 'includes'. A wide one has a large number of
1140 'includes' in a single .isolated file. 'left' is defined as an included
1141 .isolated file earlier in the 'includes' list. So the order of the elements
1142 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001143
1144 As a side effect this method starts asynchronous fetch of all data files
1145 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1146 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001147 """
1148 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1149
1150 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1151 pending = {}
1152 # Set of hashes of already retrieved items to refuse recursive includes.
1153 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001154 # Set of IsolatedFile's whose data files have already being fetched.
1155 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001156
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001157 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001158 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001159 h = isolated_file.obj_hash
1160 if h in seen:
1161 raise isolated_format.IsolatedError(
1162 'IsolatedFile %s is retrieved recursively' % h)
1163 assert h not in pending
1164 seen.add(h)
1165 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001166 # This isolated item is being added dynamically, notify FetchQueue.
1167 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001168 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1169
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001170 # Start fetching root *.isolated file (single file, not the whole bundle).
1171 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001172
1173 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001174 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001175 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001176 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001177 with fetch_queue.cache.getfileobj(item_hash) as f:
1178 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001179
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001180 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001181 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001182 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001183
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001184 # Always fetch *.isolated files in traversal order, waiting if necessary
1185 # until next to-be-processed node loads. "Waiting" is done by yielding
1186 # back to the outer loop, that waits until some *.isolated is loaded.
1187 for node in isolated_format.walk_includes(self.root):
1188 if node not in processed:
1189 # Not visited, and not yet loaded -> wait for it to load.
1190 if not node.is_loaded:
1191 break
1192 # Not visited and loaded -> process it and continue the traversal.
1193 self._start_fetching_files(node, fetch_queue)
1194 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001195
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001196 # All *.isolated files should be processed by now and only them.
1197 all_isolateds = set(isolated_format.walk_includes(self.root))
1198 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001199 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001200
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001201 # Extract 'command' and other bundle properties.
1202 for node in isolated_format.walk_includes(self.root):
1203 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001204 self.relative_cwd = self.relative_cwd or ''
1205
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001206 def _start_fetching_files(self, isolated, fetch_queue):
1207 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001208
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001209 Modifies self.files.
1210 """
maruel10bea7b2016-12-07 05:03:49 -08001211 files = isolated.data.get('files', {})
1212 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001213 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001214 if self._filter_cb and not self._filter_cb(filepath):
1215 continue
1216
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001217 # Root isolated has priority on the files being mapped. In particular,
1218 # overridden files must not be fetched.
1219 if filepath not in self.files:
1220 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001221
1222 # Make sure if the isolated is read only, the mode doesn't have write
1223 # bits.
1224 if 'm' in properties and self.read_only:
1225 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1226
1227 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001228 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001229 fetch_queue.add(
1230 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1231
1232 def _update_self(self, node):
1233 """Extracts bundle global parameters from loaded *.isolated file.
1234
1235 Will be called with each loaded *.isolated file in order of traversal of
1236 isolated include graph (see isolated_format.walk_includes).
1237 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001238 # Grabs properties.
1239 if not self.command and node.data.get('command'):
1240 # Ensure paths are correctly separated on windows.
1241 self.command = node.data['command']
1242 if self.command:
1243 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001244 if self.read_only is None and node.data.get('read_only') is not None:
1245 self.read_only = node.data['read_only']
1246 if (self.relative_cwd is None and
1247 node.data.get('relative_cwd') is not None):
1248 self.relative_cwd = node.data['relative_cwd']
1249
1250
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001251def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001252 """Returns Storage class that can upload and download from |namespace|.
1253
1254 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001255 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001256
1257 Returns:
1258 Instance of Storage.
1259 """
Lei Lei73a5f732020-03-23 20:36:14 +00001260 # Handle the specific internal use case.
1261 assert (isinstance(server_ref, isolate_storage.ServerRef) or
1262 type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001263 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001264
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001265
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001266def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1267 """Put downloaded file to destination path. This function is used for multi
1268 threaded file putting.
1269 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001270 with tools.Profiler("_map_file for %s" % dst):
1271 with cache.getfileobj(digest) as srcfileobj:
1272 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001273
Takuto Ikuta523c6472019-09-18 02:53:34 +00001274 if filetype == 'basic':
1275 # Ignore all bits apart from the user.
1276 file_mode = (props.get('m') or 0o500) & 0o700
1277 if read_only:
1278 # Enforce read-only if the root bundle does.
1279 file_mode &= 0o500
1280 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001281
Takuto Ikuta523c6472019-09-18 02:53:34 +00001282 elif filetype == 'tar':
1283 basedir = os.path.dirname(dst)
1284 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1285 ensured_dirs = set()
1286 for ti in t:
1287 if not ti.isfile():
1288 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1289 ti.type)
1290 continue
1291 # Handle files created on Windows fetched on POSIX and the
1292 # reverse.
1293 other_sep = '/' if os.path.sep == '\\' else '\\'
1294 name = ti.name.replace(other_sep, os.path.sep)
1295 fp = os.path.normpath(os.path.join(basedir, name))
1296 if not fp.startswith(basedir):
1297 logging.error('Path(%r) is outside root directory', fp)
1298 ifd = t.extractfile(ti)
1299 fp_dir = os.path.dirname(fp)
1300 if fp_dir not in ensured_dirs:
1301 file_path.ensure_tree(fp_dir)
1302 ensured_dirs.add(fp_dir)
1303 file_mode = ti.mode & 0o700
1304 if read_only:
1305 # Enforce read-only if the root bundle does.
1306 file_mode &= 0o500
1307 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001308
Takuto Ikuta523c6472019-09-18 02:53:34 +00001309 else:
1310 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001311
1312
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001313def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1314 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001315 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001316
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001317 Arguments:
1318 isolated_hash: hash of the root *.isolated file.
1319 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001320 cache: ContentAddressedCache class that knows how to store and map files
1321 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001322 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001323 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001324 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001325
1326 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001327 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001328 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001329 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001330 'fetch_isolated(%s, %s, %s, %s, %s)',
1331 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001332 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001333 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001334 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001335 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001336
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001337 with tools.Profiler('GetIsolateds'):
1338 # Optionally support local files by manually adding them to cache.
1339 if not isolated_format.is_valid_hash(isolated_hash, algo):
1340 logging.debug('%s is not a valid hash, assuming a file '
1341 '(algo was %s, hash size was %d)',
1342 isolated_hash, algo(), algo().digest_size)
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001343 path = six.text_type(os.path.abspath(isolated_hash))
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001344 try:
1345 isolated_hash = fetch_queue.inject_local_file(path, algo)
1346 except IOError as e:
1347 raise isolated_format.MappingError(
1348 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1349 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001350
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001351 # Load all *.isolated and start loading rest of the files.
1352 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001353
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001354 with tools.Profiler('GetRest'):
1355 # Create file system hierarchy.
1356 file_path.ensure_tree(outdir)
1357 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001358 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001359
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001360 # Ensure working directory exists.
1361 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1362 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001363
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001364 # Multimap: digest -> list of pairs (path, props).
1365 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001366 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001367 if 'h' in props:
1368 remaining.setdefault(props['h'], []).append((filepath, props))
1369 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001370
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001371 # Now block on the remaining files to be downloaded and mapped.
1372 logging.info('Retrieving remaining files (%d of them)...',
1373 fetch_queue.pending_count)
1374 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001375
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001376 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1377 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1378 while remaining:
1379 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001380
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001381 # Wait for any item to finish fetching to cache.
1382 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001383
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001384 # Create the files in the destination using item in cache as the
1385 # source.
1386 for filepath, props in remaining.pop(digest):
1387 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001388
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001389 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1390 _map_file, fullpath, digest,
1391 props, cache, bundle.read_only,
1392 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001393
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001394 # Report progress.
1395 duration = time.time() - last_update
1396 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1397 msg = '%d files remaining...' % len(remaining)
1398 sys.stdout.write(msg + '\n')
1399 sys.stdout.flush()
1400 logging.info(msg)
1401 last_update = time.time()
1402 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1403 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001404
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001405 # Save the cache right away to not loose the state of the new objects.
1406 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001407 # Cache could evict some items we just tried to fetch, it's a fatal error.
1408 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001409 free_disk = file_path.get_free_space(cache.cache_dir)
1410 msg = (
1411 'Cache is too small to hold all requested files.\n'
1412 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001413 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001414 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001415 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001416
1417
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001418def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001419 """Yields every file and/or symlink found.
1420
1421 Yields:
1422 tuple(FileItem, relpath, metadata)
1423 For a symlink, FileItem is None.
1424 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001425 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001426 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001427 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001428 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001429 root,
1430 u'.' + os.path.sep,
1431 blacklist,
1432 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001433
1434 filepath = os.path.join(root, relpath)
1435 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001436 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001437 meta = isolated_format.file_to_metadata(filepath, 0, False)
1438 yield None, relpath, meta
1439 continue
1440
1441 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001442 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1443 # The file was added to the current pending tarball and won't be archived
1444 # individually.
1445 continue
1446
1447 # Flush and reset the bundle.
1448 for i, p, m in bundle.yield_item_path_meta():
1449 yield i, p, m
1450 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001451
1452 # Yield the file individually.
1453 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1454 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001455
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001456 for i, p, m in bundle.yield_item_path_meta():
1457 yield i, p, m
1458
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001459
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001460def _print_upload_stats(items, missing):
1461 """Prints upload stats."""
1462 total = len(items)
1463 total_size = sum(f.size for f in items)
1464 logging.info(
1465 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1466 cache_hit = set(items).difference(missing)
1467 cache_hit_size = sum(f.size for f in cache_hit)
1468 logging.info(
1469 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1470 len(cache_hit),
1471 cache_hit_size / 1024.,
1472 len(cache_hit) * 100. / total,
1473 cache_hit_size * 100. / total_size if total_size else 0)
1474 cache_miss = missing
1475 cache_miss_size = sum(f.size for f in cache_miss)
1476 logging.info(
1477 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1478 len(cache_miss),
1479 cache_miss_size / 1024.,
1480 len(cache_miss) * 100. / total,
1481 cache_miss_size * 100. / total_size if total_size else 0)
1482
1483
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001484def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001485 """Called by archive_files_to_storage for a directory.
1486
1487 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001488
1489 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001490 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001491 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001492 files = {}
1493 for item, relpath, meta in _directory_to_metadata(
1494 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001495 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001496 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001497 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001498 yield item
1499
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001500 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001501 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001502 'algo': hash_algo_name,
1503 'files': files,
1504 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001505 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001506 # Keep the file in memory. This is fine because .isolated files are relatively
1507 # small.
1508 yield BufferItem(
1509 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001510
1511
Takuto Ikuta26980872020-04-09 06:56:37 +00001512def archive_files_to_storage(storage, files, blacklist, verify_push=False):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001513 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001514
1515 Arguments:
1516 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001517 files: iterable of files to upload. If a directory is specified (with a
1518 trailing slash), a .isolated file is created and its hash is returned.
1519 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001520 blacklist: function that returns True if a file should be omitted.
Takuto Ikuta26980872020-04-09 06:56:37 +00001521 verify_push: verify files are uploaded correctly by fetching from server.
maruel064c0a32016-04-05 11:47:15 -07001522
1523 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001524 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001525 The first file in the first item is always the .isolated file.
Ye Kuang4e994292020-04-10 07:07:35 +00001526
1527 Raises:
1528 Re-raises the exception in upload_items(), if there is any.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001529 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001530 # Dict of path to hash.
1531 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001532 hash_algo = storage.server_ref.hash_algo
1533 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001534 # Generator of FileItem to pass to upload_items() concurrent operation.
1535 channel = threading_utils.TaskChannel()
Ye Kuang4e994292020-04-10 07:07:35 +00001536 exc_channel = threading_utils.TaskChannel()
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001537 uploaded_digests = set()
Ye Kuang4e994292020-04-10 07:07:35 +00001538
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001539 def _upload_items():
Ye Kuang4e994292020-04-10 07:07:35 +00001540 try:
1541 results = storage.upload_items(channel, verify_push)
1542 uploaded_digests.update(f.digest for f in results)
1543 except Exception:
1544 exc_channel.send_exception()
1545
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001546 t = threading.Thread(target=_upload_items)
1547 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001548
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001549 # Keep track locally of the items to determine cold and hot items.
1550 items_found = []
1551 try:
1552 for f in files:
Takuto Ikuta95459dd2019-10-29 12:39:47 +00001553 assert isinstance(f, six.text_type), repr(f)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001554 if f in results:
1555 # Duplicate
1556 continue
1557 try:
1558 filepath = os.path.abspath(f)
1559 if fs.isdir(filepath):
1560 # Uploading a whole directory.
1561 item = None
1562 for item in _enqueue_dir(
1563 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001564 channel.send_result(item)
1565 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001566 # The very last item will be the .isolated file.
1567 if not item:
1568 # There was no file in the directory.
1569 continue
1570 elif fs.isfile(filepath):
1571 item = FileItem(
1572 path=filepath,
1573 algo=hash_algo,
1574 size=None,
1575 high_priority=f.endswith('.isolated'))
1576 channel.send_result(item)
1577 items_found.append(item)
1578 else:
1579 raise Error('%s is neither a file or directory.' % f)
1580 results[f] = item.digest
1581 except OSError:
1582 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001583 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001584 # Stops the generator, so _upload_items() can exit.
1585 channel.send_done()
1586 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +00001587 exc_channel.send_done()
1588 for _ in exc_channel:
1589 pass
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001590
1591 cold = []
1592 hot = []
1593 for i in items_found:
1594 # Note that multiple FileItem may have the same .digest.
1595 if i.digest in uploaded_digests:
1596 cold.append(i)
1597 else:
1598 hot.append(i)
1599 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001600
1601
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001602@subcommand.usage('<file1..fileN> or - to read from stdin')
1603def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001604 """Archives data to the server.
1605
1606 If a directory is specified, a .isolated file is created the whole directory
1607 is uploaded. Then this .isolated file can be included in another one to run
1608 commands.
1609
1610 The commands output each file that was processed with its content hash. For
1611 directories, the .isolated generated for the directory is listed as the
1612 directory entry itself.
1613 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001614 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001615 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001616 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001617 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001618 server_ref = isolate_storage.ServerRef(
1619 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001620 if files == ['-']:
1621 files = (l.rstrip('\n\r') for l in sys.stdin)
1622 if not files:
1623 parser.error('Nothing to upload')
1624 files = (f.decode('utf-8') for f in files)
1625 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001626 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001627 with get_storage(server_ref) as storage:
1628 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001629 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001630 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001631 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001632 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001633
1634
1635def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001636 """Download data from the server.
1637
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001638 It can either download individual files or a complete tree from a .isolated
1639 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001640 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001641 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001642 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001643 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001644 help='hash of an isolated file, .isolated file content is discarded, use '
1645 '--file if you need it')
1646 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001647 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1648 help='hash and destination of a file, can be used multiple times')
1649 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001650 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001651 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001652 parser.add_option(
1653 '--use-symlinks', action='store_true',
1654 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001655 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001656 options, args = parser.parse_args(args)
1657 if args:
1658 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001659 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001660 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001661
nodir55be77b2016-05-03 09:39:57 -07001662 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001663 if bool(options.isolated) == bool(options.file):
1664 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001665 if not options.cache and options.use_symlinks:
1666 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001667
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001668 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001669 cache.cleanup()
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001670 options.target = six.text_type(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001671 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001672 if (fs.isfile(options.target) or
1673 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001674 parser.error(
1675 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001676 server_ref = isolate_storage.ServerRef(
1677 options.isolate_server, options.namespace)
1678 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001679 # Fetching individual files.
1680 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001681 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001682 channel = threading_utils.TaskChannel()
1683 pending = {}
1684 for digest, dest in options.file:
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001685 dest = six.text_type(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001686 pending[digest] = dest
1687 storage.async_fetch(
1688 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001689 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001690 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001691 local_caching.UNKNOWN_FILE_SIZE,
1692 functools.partial(
1693 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001694 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001695 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001696 dest = pending.pop(fetched)
1697 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001698
Vadim Shtayura3172be52013-12-03 12:49:05 -08001699 # Fetching whole isolated tree.
1700 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001701 bundle = fetch_isolated(
1702 isolated_hash=options.isolated,
1703 storage=storage,
1704 cache=cache,
1705 outdir=options.target,
1706 use_symlinks=options.use_symlinks)
1707 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001708 if bundle.command:
1709 rel = os.path.join(options.target, bundle.relative_cwd)
1710 print('To run this test please run from the directory %s:' %
1711 os.path.join(options.target, rel))
1712 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001713
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001714 return 0
1715
1716
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001717def add_archive_options(parser):
1718 parser.add_option(
1719 '--blacklist',
1720 action='append', default=list(DEFAULT_BLACKLIST),
1721 help='List of regexp to use as blacklist filter when uploading '
1722 'directories')
1723
1724
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001725def add_isolate_server_options(parser):
1726 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001727 parser.add_option(
1728 '-I', '--isolate-server',
1729 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001730 help='URL of the Isolate Server to use. Defaults to the environment '
1731 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1732 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001733 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001734 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001735 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001736 '--namespace', default='default-gzip',
1737 help='The namespace to use on the Isolate Server, default: %default')
1738
1739
nodir55be77b2016-05-03 09:39:57 -07001740def process_isolate_server_options(
1741 parser, options, set_exception_handler, required):
1742 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001743
1744 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001745 """
1746 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001747 if required:
1748 parser.error('--isolate-server is required.')
1749 return
1750
aludwind7b7b7e2017-06-29 16:38:50 -07001751 if options.grpc_proxy:
1752 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001753 else:
1754 try:
1755 options.isolate_server = net.fix_url(options.isolate_server)
1756 except ValueError as e:
1757 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001758 if set_exception_handler:
1759 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001760 try:
1761 return auth.ensure_logged_in(options.isolate_server)
1762 except ValueError as e:
1763 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001764 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001765
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001766
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001767def add_cache_options(parser):
1768 cache_group = optparse.OptionGroup(parser, 'Cache management')
1769 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001770 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001771 help='Directory to keep a local cache of the files. Accelerates download '
1772 'by reusing already downloaded files. Default=%default')
1773 cache_group.add_option(
1774 '--max-cache-size',
1775 type='int',
1776 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001777 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001778 help='Trim if the cache gets larger than this value, default=%default')
1779 cache_group.add_option(
1780 '--min-free-space',
1781 type='int',
1782 metavar='NNN',
1783 default=2*1024*1024*1024,
1784 help='Trim if disk free space becomes lower than this value, '
1785 'default=%default')
1786 cache_group.add_option(
1787 '--max-items',
1788 type='int',
1789 metavar='NNN',
1790 default=100000,
1791 help='Trim if more than this number of items are in the cache '
1792 'default=%default')
1793 parser.add_option_group(cache_group)
1794
1795
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001796def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001797 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001798 policies = local_caching.CachePolicies(
1799 options.max_cache_size,
1800 options.min_free_space,
1801 options.max_items,
1802 # 3 weeks.
1803 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001804
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001805 # |options.cache| path may not exist until DiskContentAddressedCache()
1806 # instance is created.
1807 return local_caching.DiskContentAddressedCache(
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001808 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001809 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001810
1811
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001812class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001813 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001814 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001815 self,
1816 version=__version__,
1817 prog=os.path.basename(sys.modules[__name__].__file__),
1818 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001819 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001820
1821 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001822 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001823 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001824 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001825 return options, args
1826
1827
1828def main(args):
1829 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001830 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001831
1832
1833if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001834 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001835 fix_encoding.fix_encoding()
1836 tools.disable_buffering()
1837 colorama.init()
Takuto Ikuta160b4452020-04-15 06:33:55 +00001838 net.set_user_agent('isolateserver.py/' + __version__)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001839 sys.exit(main(sys.argv[1:]))