blob: 95177c60c34f96f090281091f8218055e3fc887f [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Takuto Ikuta160b4452020-04-15 06:33:55 +000025from utils import net
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
Takuto Ikuta6e2ff962019-10-29 12:35:27 +000033import six
Lei Leife202df2019-06-11 17:33:34 +000034from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000035
36# pylint: disable=ungrouped-imports
37import auth
38import isolated_format
39import isolate_storage
40import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050041from utils import file_path
maruel12e30012015-10-09 11:55:35 -070042from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040043from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000044from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040045from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070046from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000047from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080048
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000049
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000050# Version of isolate protocol passed to the server in /handshake request.
51ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000052
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053
Vadim Shtayura3148e072014-09-02 18:51:52 -070054# Maximum expected delay (in seconds) between successive file fetches or uploads
55# in Storage. If it takes longer than that, a deadlock might be happening
56# and all stack frames for all threads are dumped to log.
57DEADLOCK_TIMEOUT = 5 * 60
58
59
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# All files are sorted by likelihood of a change in the file content
62# (currently file size is used to estimate this: larger the file -> larger the
63# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000064# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000065# and so on. Numbers here is a trade-off; the more per request, the lower the
66# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
67# larger values cause longer lookups, increasing the initial latency to start
68# uploading, which is especially an issue for large files. This value is
69# optimized for the "few thousands files to look up with minimal number of large
70# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040071ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000072
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000073
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000074# A list of already compressed extension types that should not receive any
75# compression before being uploaded.
76ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040077 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
78 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000079]
80
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000081
maruel@chromium.org41601642013-09-18 19:40:46 +000082# The delay (in seconds) to wait between logging statements when retrieving
83# the required files. This is intended to let the user (or buildbot) know that
84# the program is still running.
85DELAY_BETWEEN_UPDATES_IN_SECS = 30
86
87
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050088DEFAULT_BLACKLIST = (
89 # Temporary vim or python files.
90 r'^.+\.(?:pyc|swp)$',
91 # .git or .svn directory.
92 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
93)
94
95
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050096class Error(Exception):
97 """Generic runtime error."""
98 pass
99
100
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400101class Aborted(Error):
102 """Operation aborted."""
103 pass
104
105
nodir90bc8dc2016-06-15 13:35:21 -0700106class AlreadyExists(Error):
107 """File already exists."""
108
109
maruel12e30012015-10-09 11:55:35 -0700110def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700112 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800113 if offset:
114 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000116 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000117 if not data:
118 break
119 yield data
120
121
tansell9e04a8d2016-07-28 09:31:59 -0700122def fileobj_path(fileobj):
123 """Return file system path for file like object or None.
124
125 The returned path is guaranteed to exist and can be passed to file system
126 operations like copy.
127 """
128 name = getattr(fileobj, 'name', None)
129 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000130 return None
tansell9e04a8d2016-07-28 09:31:59 -0700131
132 # If the file like object was created using something like open("test.txt")
133 # name will end up being a str (such as a function outside our control, like
134 # the standard library). We want all our paths to be unicode objects, so we
135 # decode it.
Takuto Ikuta95459dd2019-10-29 12:39:47 +0000136 if not isinstance(name, six.text_type):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500137 # We incorrectly assume that UTF-8 is used everywhere.
138 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700139
tansell26de79e2016-11-13 18:41:11 -0800140 # fs.exists requires an absolute path, otherwise it will fail with an
141 # assertion error.
142 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000143 return None
tansell26de79e2016-11-13 18:41:11 -0800144
tansell9e04a8d2016-07-28 09:31:59 -0700145 if fs.exists(name):
146 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000147 return None
tansell9e04a8d2016-07-28 09:31:59 -0700148
149
150# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
151# wrappers have been created.
152def fileobj_copy(
153 dstfileobj, srcfileobj, size=-1,
154 chunk_size=isolated_format.DISK_FILE_CHUNK):
155 """Copy data from srcfileobj to dstfileobj.
156
157 Providing size means exactly that amount of data will be copied (if there
158 isn't enough data, an IOError exception is thrown). Otherwise all data until
159 the EOF marker will be copied.
160 """
161 if size == -1 and hasattr(srcfileobj, 'tell'):
162 if srcfileobj.tell() != 0:
163 raise IOError('partial file but not using size')
164
165 written = 0
166 while written != size:
167 readsize = chunk_size
168 if size > 0:
169 readsize = min(readsize, size-written)
170 data = srcfileobj.read(readsize)
171 if not data:
172 if size == -1:
173 break
174 raise IOError('partial file, got %s, wanted %s' % (written, size))
175 dstfileobj.write(data)
176 written += len(data)
177
178
179def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
180 """Put srcfileobj at the given dstpath with given mode.
181
182 The function aims to do this as efficiently as possible while still allowing
183 any possible file like object be given.
184
185 Creating a tree of hardlinks has a few drawbacks:
186 - tmpfs cannot be used for the scratch space. The tree has to be on the same
187 partition as the cache.
188 - involves a write to the inode, which advances ctime, cause a metadata
189 writeback (causing disk seeking).
190 - cache ctime cannot be used to detect modifications / corruption.
191 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
192 partition. This is why the function automatically fallbacks to copying the
193 file content.
194 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
195 same owner is for all hardlinks.
196 - Anecdotal report that ext2 is known to be potentially faulty on high rate
197 of hardlink creation.
198
199 Creating a tree of symlinks has a few drawbacks:
200 - Tasks running the equivalent of os.path.realpath() will get the naked path
201 and may fail.
202 - Windows:
203 - Symlinks are reparse points:
204 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
205 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
206 - Symbolic links are Win32 paths, not NT paths.
207 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
208 - Symbolic links are supported on Windows 7 and later only.
209 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
210 default.
211 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
212 RID is present in the token;
213 https://msdn.microsoft.com/en-us/library/bb530410.aspx
214 """
215 srcpath = fileobj_path(srcfileobj)
216 if srcpath and size == -1:
217 readonly = file_mode is None or (
218 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
219
220 if readonly:
221 # If the file is read only we can link the file
222 if use_symlink:
223 link_mode = file_path.SYMLINK_WITH_FALLBACK
224 else:
225 link_mode = file_path.HARDLINK_WITH_FALLBACK
226 else:
227 # If not read only, we must copy the file
228 link_mode = file_path.COPY
229
230 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000231 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700232 else:
233 # Need to write out the file
234 with fs.open(dstpath, 'wb') as dstfileobj:
235 fileobj_copy(dstfileobj, srcfileobj, size)
236
Takuto Ikuta523c6472019-09-18 02:53:34 +0000237 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
238 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
239 # early to skip slow/unnecessary chmod call.
240 return
tansell9e04a8d2016-07-28 09:31:59 -0700241
242 # file_mode of 0 is actually valid, so need explicit check.
243 if file_mode is not None:
244 fs.chmod(dstpath, file_mode)
245
246
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000247def zip_compress(content_generator, level=7):
248 """Reads chunks from |content_generator| and yields zip compressed chunks."""
249 compressor = zlib.compressobj(level)
250 for chunk in content_generator:
251 compressed = compressor.compress(chunk)
252 if compressed:
253 yield compressed
254 tail = compressor.flush(zlib.Z_FINISH)
255 if tail:
256 yield tail
257
258
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400259def zip_decompress(
260 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000261 """Reads zipped data from |content_generator| and yields decompressed data.
262
263 Decompresses data in small chunks (no larger than |chunk_size|) so that
264 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
265
266 Raises IOError if data is corrupted or incomplete.
267 """
268 decompressor = zlib.decompressobj()
269 compressed_size = 0
270 try:
271 for chunk in content_generator:
272 compressed_size += len(chunk)
273 data = decompressor.decompress(chunk, chunk_size)
274 if data:
275 yield data
276 while decompressor.unconsumed_tail:
277 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
278 if data:
279 yield data
280 tail = decompressor.flush()
281 if tail:
282 yield tail
283 except zlib.error as e:
284 raise IOError(
285 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
286 # Ensure all data was read and decompressed.
287 if decompressor.unused_data or decompressor.unconsumed_tail:
288 raise IOError('Not all data was decompressed')
289
290
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000291def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000292 """Given a filename calculates the ideal zip compression level to use."""
293 file_ext = os.path.splitext(filename)[1].lower()
294 # TODO(csharp): Profile to find what compression level works best.
295 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
296
297
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000298def create_directories(base_directory, files):
299 """Creates the directory structure needed by the given list of files."""
300 logging.debug('create_directories(%s, %d)', base_directory, len(files))
301 # Creates the tree of directories to create.
302 directories = set(os.path.dirname(f) for f in files)
303 for item in list(directories):
304 while item:
305 directories.add(item)
306 item = os.path.dirname(item)
307 for d in sorted(directories):
308 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700309 abs_d = os.path.join(base_directory, d)
310 if not fs.isdir(abs_d):
311 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000312
313
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000314def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500315 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000316 for filepath, properties in files:
317 if 'l' not in properties:
318 continue
319 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500320 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000321 logging.warning('Ignoring symlink %s', filepath)
322 continue
323 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700324 try:
325 os.symlink(properties['l'], outfile) # pylint: disable=E1101
326 except OSError as e:
327 if e.errno == errno.EEXIST:
328 raise AlreadyExists('File %s already exists.' % outfile)
329 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000330
331
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000332class _ThreadFile(object):
333 """Multithreaded fake file. Used by TarBundle."""
334 def __init__(self):
335 self._data = threading_utils.TaskChannel()
336 self._offset = 0
337
338 def __iter__(self):
339 return self._data
340
341 def tell(self):
342 return self._offset
343
344 def write(self, b):
345 self._data.send_result(b)
346 self._offset += len(b)
347
348 def close(self):
349 self._data.send_done()
350
351
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400352class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800353 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000354
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800355 Its digest and size may be provided in advance, if known. Otherwise they will
356 be derived from the file content.
357 """
358
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000359 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800360 super(FileItem, self).__init__(
361 digest,
maruel12e30012015-10-09 11:55:35 -0700362 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000363 high_priority,
364 compression_level=_get_zip_compression_level(path))
365 self._path = path
366 self._algo = algo
367 self._meta = None
368
369 @property
370 def path(self):
371 return self._path
372
373 @property
Takuto Ikuta57cb09c2020-04-23 04:10:49 +0000374 def algo(self):
375 return self._algo
376
377 @property
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000378 def digest(self):
379 if not self._digest:
380 self._digest = isolated_format.hash_file(self._path, self._algo)
381 return self._digest
382
383 @property
384 def meta(self):
385 if not self._meta:
386 # TODO(maruel): Inline.
387 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
388 # We need to hash right away.
389 self._meta['h'] = self.digest
390 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000391
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800392 def content(self):
393 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000394
395
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000396class TarBundle(isolate_storage.Item):
397 """Tarfile to push to Storage.
398
399 Its digest is the digest of all the files it contains. It is generated on the
400 fly.
401 """
402
403 def __init__(self, root, algo):
404 # 2 trailing 512 bytes headers.
405 super(TarBundle, self).__init__(size=1024)
406 self._items = []
407 self._meta = None
408 self._algo = algo
409 self._root_len = len(root) + 1
410 # Same value as for Go.
411 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
412 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
413 self._archive_max_size = int(10e6)
414
415 @property
416 def digest(self):
417 if not self._digest:
418 self._prepare()
419 return self._digest
420
421 @property
422 def size(self):
423 if self._size is None:
424 self._prepare()
425 return self._size
426
427 def try_add(self, item):
428 """Try to add this file to the bundle.
429
430 It is extremely naive but this should be just enough for
431 https://crbug.com/825418.
432
433 Future improvements should be in the Go code, and the Swarming bot should be
434 migrated to use the Go code instead.
435 """
436 if not item.size:
437 return False
438 # pylint: disable=unreachable
439 rounded = (item.size + 512) & ~511
440 if rounded + self._size > self._archive_max_size:
441 return False
442 # https://crbug.com/825418
443 return False
444 self._size += rounded
445 self._items.append(item)
446 return True
447
448 def yield_item_path_meta(self):
449 """Returns a tuple(Item, filepath, meta_dict).
450
451 If the bundle contains less than 5 items, the items are yielded.
452 """
453 if len(self._items) < 5:
454 # The tarball is too small, yield individual items, if any.
455 for item in self._items:
456 yield item, item.path[self._root_len:], item.meta
457 else:
458 # This ensures self._meta is set.
459 p = self.digest + '.tar'
460 # Yield itself as a tarball.
461 yield self, p, self._meta
462
463 def content(self):
464 """Generates the tarfile content on the fly."""
465 obj = _ThreadFile()
466 def _tar_thread():
467 try:
468 t = tarfile.open(
469 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
470 for item in self._items:
471 logging.info(' tarring %s', item.path)
472 t.add(item.path)
473 t.close()
474 except Exception:
475 logging.exception('Internal failure')
476 finally:
477 obj.close()
478
479 t = threading.Thread(target=_tar_thread)
480 t.start()
481 try:
482 for data in obj:
483 yield data
484 finally:
485 t.join()
486
487 def _prepare(self):
488 h = self._algo()
489 total = 0
490 for chunk in self.content():
491 h.update(chunk)
492 total += len(chunk)
493 # pylint: disable=attribute-defined-outside-init
494 # This is not true, they are defined in Item.__init__().
495 self._digest = h.hexdigest()
496 self._size = total
497 self._meta = {
498 'h': self.digest,
499 's': self.size,
500 't': u'tar',
501 }
502
503
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400504class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000505 """A byte buffer to push to Storage."""
506
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000507 def __init__(self, buf, algo, high_priority=False):
508 super(BufferItem, self).__init__(
509 digest=algo(buf).hexdigest(),
510 size=len(buf),
511 high_priority=high_priority)
512 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000513
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000515 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000516
517
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000518class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800519 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000520
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800521 Implements compression support, parallel 'contains' checks, parallel uploads
522 and more.
523
524 Works only within single namespace (and thus hashing algorithm and compression
525 scheme are fixed).
526
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400527 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
528 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 """
530
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700531 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 self._storage_api = storage_api
533 self._cpu_thread_pool = None
534 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400535 self._aborted = False
536 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000537
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000539 def server_ref(self):
540 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700541
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000542 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700543 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000544 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700545
546 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000547 def cpu_thread_pool(self):
548 """ThreadPool for CPU-bound tasks like zipping."""
549 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500550 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000551 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
552 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500553 # On 32 bits userland, do not try to use more than 16 threads.
554 threads = min(threads, 16)
555 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000556 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000557
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000558 @property
559 def net_thread_pool(self):
560 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
561 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700562 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000563 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000564
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000565 def close(self):
566 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400567 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000568 if self._cpu_thread_pool:
569 self._cpu_thread_pool.join()
570 self._cpu_thread_pool.close()
571 self._cpu_thread_pool = None
572 if self._net_thread_pool:
573 self._net_thread_pool.join()
574 self._net_thread_pool.close()
575 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400576 logging.info('Done.')
577
578 def abort(self):
579 """Cancels any pending or future operations."""
580 # This is not strictly theadsafe, but in the worst case the logging message
581 # will be printed twice. Not a big deal. In other places it is assumed that
582 # unprotected reads and writes to _aborted are serializable (it is true
583 # for python) and thus no locking is used.
584 if not self._aborted:
585 logging.warning('Aborting... It can take a while.')
586 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000587
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000588 def __enter__(self):
589 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400590 assert not self._prev_sig_handlers, self._prev_sig_handlers
591 for s in (signal.SIGINT, signal.SIGTERM):
592 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000593 return self
594
595 def __exit__(self, _exc_type, _exc_value, _traceback):
596 """Context manager interface."""
597 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400598 while self._prev_sig_handlers:
599 s, h = self._prev_sig_handlers.popitem()
600 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000601 return False
602
Takuto Ikuta26980872020-04-09 06:56:37 +0000603 def upload_items(self, items, verify_push=False):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000604 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000605
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800606 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000607
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000608 It uses 3 threads internally:
609 - One to create batches based on a timeout
610 - One to dispatch the /contains RPC and field the missing entries
611 - One to field the /push RPC
612
613 The main threads enumerates 'items' and pushes to the first thread. Then it
614 join() all the threads, waiting for them to complete.
615
616 (enumerate items of Item, this can be slow as disk is traversed)
617 |
618 v
619 _create_items_batches_thread Thread #1
620 (generates list(Item), every 3s or 20~100 items)
621 |
622 v
623 _do_lookups_thread Thread #2
624 | |
625 v v
626 (missing) (was on server)
627 |
628 v
629 _handle_missing_thread Thread #3
630 |
631 v
632 (upload Item, append to uploaded)
633
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000634 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400635 items: list of isolate_storage.Item instances that represents data to
636 upload.
Takuto Ikuta26980872020-04-09 06:56:37 +0000637 verify_push: verify files are uploaded correctly by fetching from server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000638
639 Returns:
640 List of items that were uploaded. All other items are already there.
Ye Kuang4e994292020-04-10 07:07:35 +0000641
642 Raises:
643 The first exception being raised in the worker threads.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000644 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000645 incoming = Queue.Queue()
646 batches_to_lookup = Queue.Queue()
647 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000648 uploaded = []
Ye Kuang4e994292020-04-10 07:07:35 +0000649 exc_channel = threading_utils.TaskChannel()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800650
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000651 def _create_items_batches_thread():
652 """Creates batches for /contains RPC lookup from individual items.
653
654 Input: incoming
655 Output: batches_to_lookup
656 """
657 try:
658 batch_size_index = 0
659 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
660 batch = []
661 while not self._aborted:
662 try:
663 item = incoming.get(True, timeout=3)
664 if item:
665 batch.append(item)
666 except Queue.Empty:
667 item = False
668 if len(batch) == batch_size or (not item and batch):
669 if len(batch) == batch_size:
670 batch_size_index += 1
671 batch_size = ITEMS_PER_CONTAINS_QUERIES[
672 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
673 batches_to_lookup.put(batch)
674 batch = []
675 if item is None:
676 break
Ye Kuang4e994292020-04-10 07:07:35 +0000677 except Exception:
678 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000679 finally:
680 # Unblock the next pipeline.
681 batches_to_lookup.put(None)
682
683 def _do_lookups_thread():
684 """Enqueues all the /contains RPCs and emits the missing items.
685
686 Input: batches_to_lookup
687 Output: missing, to_upload
688 """
689 try:
690 channel = threading_utils.TaskChannel()
691 def _contains(b):
692 if self._aborted:
693 raise Aborted()
694 return self._storage_api.contains(b)
695
696 pending_contains = 0
697 while not self._aborted:
698 batch = batches_to_lookup.get()
699 if batch is None:
700 break
701 self.net_thread_pool.add_task_with_channel(
702 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
703 pending_contains += 1
704 while pending_contains and not self._aborted:
705 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000706 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000707 except threading_utils.TaskChannel.Timeout:
708 break
709 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000710 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000711 missing.put((missing_item, push_state))
712 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000713 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000714 missing.put((missing_item, push_state))
715 pending_contains -= 1
Ye Kuang4e994292020-04-10 07:07:35 +0000716 except Exception:
717 exc_channel.send_exception()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000718 finally:
719 # Unblock the next pipeline.
720 missing.put((None, None))
721
722 def _handle_missing_thread():
723 """Sends the missing items to the uploader.
724
725 Input: missing
726 Output: uploaded
727 """
Ye Kuang4e994292020-04-10 07:07:35 +0000728 try:
729 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
730 channel = threading_utils.TaskChannel()
731 pending_upload = 0
732 while not self._aborted:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000733 try:
Ye Kuang4e994292020-04-10 07:07:35 +0000734 missing_item, push_state = missing.get(True, timeout=5)
735 if missing_item is None:
736 break
737 self._async_push(channel, missing_item, push_state, verify_push)
738 pending_upload += 1
739 except Queue.Empty:
740 pass
741 detector.ping()
742 while not self._aborted and pending_upload:
743 try:
744 item = channel.next(timeout=0)
745 except threading_utils.TaskChannel.Timeout:
746 break
747 uploaded.append(item)
748 pending_upload -= 1
749 logging.debug('Uploaded %d; %d pending: %s (%d)', len(uploaded),
750 pending_upload, item.digest, item.size)
751 while not self._aborted and pending_upload:
752 item = channel.next()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000753 uploaded.append(item)
754 pending_upload -= 1
755 logging.debug(
756 'Uploaded %d; %d pending: %s (%d)',
757 len(uploaded), pending_upload, item.digest, item.size)
Ye Kuang4e994292020-04-10 07:07:35 +0000758 except Exception:
759 exc_channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000760
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000761 threads = [
762 threading.Thread(target=_create_items_batches_thread),
763 threading.Thread(target=_do_lookups_thread),
764 threading.Thread(target=_handle_missing_thread),
765 ]
766 for t in threads:
767 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000768
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000769 try:
770 # For each digest keep only first isolate_storage.Item that matches it.
771 # All other items are just indistinguishable copies from the point of view
772 # of isolate server (it doesn't care about paths at all, only content and
773 # digests).
774 seen = {}
775 try:
776 # TODO(maruel): Reorder the items as a priority queue, with larger items
777 # being processed first. This is, before hashing the data.
778 # This must be done in the primary thread since items can be a
779 # generator.
780 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000781 if seen.setdefault(item.digest, item) is item:
782 incoming.put(item)
783 finally:
784 incoming.put(None)
785 finally:
786 for t in threads:
787 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +0000788 exc_channel.send_done()
789 for _ in exc_channel:
790 # If there is no exception, this loop does nothing. Otherwise, it raises
791 # the first exception put onto |exc_channel|.
792 pass
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000793
794 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000795 if seen:
796 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000797 return uploaded
798
Takuto Ikuta26980872020-04-09 06:56:37 +0000799 def _async_push(self, channel, item, push_state, verify_push=False):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000800 """Starts asynchronous push to the server in a parallel thread.
801
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000802 Can be used only after |item| was checked for presence on a server with a
803 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800804
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000805 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000806 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400807 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000808 push_state: push state returned by storage_api.contains(). It contains
809 storage specific information describing how to upload the item (for
810 example in case of cloud storage, it is signed upload URLs).
Takuto Ikuta26980872020-04-09 06:56:37 +0000811 verify_push: verify files are uploaded correctly by fetching from server.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800812
813 Returns:
814 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000815 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800816 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400817 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700818 threading_utils.PRIORITY_HIGH if item.high_priority
819 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800820
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000821 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400822 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400823 if self._aborted:
824 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800825 self._storage_api.push(item, push_state, content)
Takuto Ikuta26980872020-04-09 06:56:37 +0000826 if verify_push:
Takuto Ikutac01532c2020-04-21 07:56:54 +0000827 try:
828 self._fetch(
829 item.digest,
830 item.size,
831 # this consumes all elements from given generator.
832 lambda gen: collections.deque(gen, maxlen=0))
833 except Exception:
834 # reset push_state if failed to verify.
835 push_state.finalized = False
836 push_state.uploaded = False
837 raise
838
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000839 return item
840
Wei Huang1a38fbe2017-11-28 22:55:22 -0500841 # If zipping is not required, just start a push task. Don't pass 'content'
842 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000843 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000844 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000845 return
846
847 # If zipping is enabled, zip in a separate thread.
848 def zip_and_push():
849 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
850 # content right here. It will block until all file is zipped.
851 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400852 if self._aborted:
853 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800854 stream = zip_compress(item.content(), item.compression_level)
Lei Lei73a5f732020-03-23 20:36:14 +0000855 # In Python3, zlib.compress returns a byte object instead of str.
856 data = six.b('').join(stream)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000857 except Exception as exc:
858 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800859 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000860 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500861 # Pass '[data]' explicitly because the compressed data is not same as the
862 # one provided by 'item'. Since '[data]' is a list, it can safely be
863 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000864 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000865 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000866 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000867
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800868 def push(self, item, push_state):
869 """Synchronously pushes a single item to the server.
870
871 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000872 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800873
874 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400875 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000876 push_state: push state returned by storage_api.contains(). It contains
877 storage specific information describing how to upload the item (for
878 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800879
880 Returns:
881 Pushed item (same object as |item|).
882 """
883 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700884 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000885 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000886 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800887 assert pushed is item
888 return item
889
Takuto Ikuta26980872020-04-09 06:56:37 +0000890 def _fetch(self, digest, size, sink):
891 try:
892 # Prepare reading pipeline.
893 stream = self._storage_api.fetch(digest, size, 0)
894 if self.server_ref.is_with_compression:
895 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
896 # Run |stream| through verifier that will assert its size.
897 verifier = FetchStreamVerifier(stream, self.server_ref.hash_algo, digest,
898 size)
899 # Verified stream goes to |sink|.
900 sink(verifier.run())
Takuto Ikuta98dcc372020-04-20 09:09:28 +0000901 except Exception:
902 logging.exception('Failed to fetch %s', digest)
Takuto Ikuta26980872020-04-09 06:56:37 +0000903 raise
904
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000905 def async_fetch(self, channel, priority, digest, size, sink):
906 """Starts asynchronous fetch from the server in a parallel thread.
907
908 Arguments:
909 channel: TaskChannel that receives back |digest| when download ends.
910 priority: thread pool task priority for the fetch.
911 digest: hex digest of an item to download.
912 size: expected size of the item (after decompression).
913 sink: function that will be called as sink(generator).
914 """
915 def fetch():
Takuto Ikuta26980872020-04-09 06:56:37 +0000916 self._fetch(digest, size, sink)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000917 return digest
918
919 # Don't bother with zip_thread_pool for decompression. Decompression is
920 # really fast and most probably IO bound anyway.
921 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
922
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000923
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000924class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400925 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926
927 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400928 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
929 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000930 """
931
932 def __init__(self, storage, cache):
933 self.storage = storage
934 self.cache = cache
935 self._channel = threading_utils.TaskChannel()
936 self._pending = set()
937 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000938 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400939 # Pending digests that the caller waits for, see wait_on()/wait().
940 self._waiting_on = set()
941 # Already fetched digests the caller waits for which are not yet returned by
942 # wait().
943 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000944
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400945 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700946 self,
947 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400948 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700949 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000950 """Starts asynchronous fetch of item |digest|."""
951 # Fetching it now?
952 if digest in self._pending:
953 return
954
955 # Mark this file as in use, verify_all_cached will later ensure it is still
956 # in cache.
957 self._accessed.add(digest)
958
959 # Already fetched? Notify cache to update item's LRU position.
960 if digest in self._fetched:
961 # 'touch' returns True if item is in cache and not corrupted.
962 if self.cache.touch(digest, size):
963 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000964 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000965 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000966
967 # TODO(maruel): It should look at the free disk space, the current cache
968 # size and the size of the new item on every new item:
969 # - Trim the cache as more entries are listed when free disk space is low,
970 # otherwise if the amount of data downloaded during the run > free disk
971 # space, it'll crash.
972 # - Make sure there's enough free disk space to fit all dependencies of
973 # this run! If not, abort early.
974
975 # Start fetching.
976 self._pending.add(digest)
977 self.storage.async_fetch(
978 self._channel, priority, digest, size,
979 functools.partial(self.cache.write, digest))
980
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400981 def wait_on(self, digest):
982 """Updates digests to be waited on by 'wait'."""
983 # Calculate once the already fetched items. These will be retrieved first.
984 if digest in self._fetched:
985 self._waiting_on_ready.add(digest)
986 else:
987 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000988
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400989 def wait(self):
990 """Waits until any of waited-on items is retrieved.
991
992 Once this happens, it is remove from the waited-on set and returned.
993
994 This function is called in two waves. The first wave it is done for HIGH
995 priority items, the isolated files themselves. The second wave it is called
996 for all the files.
997
998 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000999 """
1000 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001001 if self._waiting_on_ready:
1002 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001003
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001004 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001005
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001006 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001007 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001008 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001009 self._pending.remove(digest)
1010 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001011 if digest in self._waiting_on:
1012 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001013 return digest
1014
1015 # Should never reach this point due to assert above.
1016 raise RuntimeError('Impossible state')
1017
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001018 @property
1019 def wait_queue_empty(self):
1020 """Returns True if there is no digest left for wait() to return."""
1021 return not self._waiting_on and not self._waiting_on_ready
1022
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001023 def inject_local_file(self, path, algo):
1024 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -07001025 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001026 data = f.read()
1027 digest = algo(data).hexdigest()
1028 self.cache.write(digest, [data])
1029 self._fetched.add(digest)
1030 return digest
1031
1032 @property
1033 def pending_count(self):
1034 """Returns number of items to be fetched."""
1035 return len(self._pending)
1036
1037 def verify_all_cached(self):
1038 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001039 # Not thread safe, but called after all work is done.
1040 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001041
1042
1043class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001044 """Verifies that fetched file is valid before passing it to the
1045 ContentAddressedCache.
1046 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001047
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001048 def __init__(self, stream, hasher, expected_digest, expected_size):
1049 """Initializes the verifier.
1050
1051 Arguments:
1052 * stream: an iterable yielding chunks of content
1053 * hasher: an object from hashlib that supports update() and hexdigest()
1054 (eg, hashlib.sha1).
1055 * expected_digest: if the entire stream is piped through hasher and then
1056 summarized via hexdigest(), this should be the result. That is, it
1057 should be a hex string like 'abc123'.
1058 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001059 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001060 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001061 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001062 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001063 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001064 self.expected_size = expected_size
1065 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001066 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001067
1068 def run(self):
1069 """Generator that yields same items as |stream|.
1070
1071 Verifies |stream| is complete before yielding a last chunk to consumer.
1072
1073 Also wraps IOError produced by consumer into MappingError exceptions since
1074 otherwise Storage will retry fetch on unrelated local cache errors.
1075 """
1076 # Read one chunk ahead, keep it in |stored|.
1077 # That way a complete stream can be verified before pushing last chunk
1078 # to consumer.
1079 stored = None
1080 for chunk in self.stream:
1081 assert chunk is not None
1082 if stored is not None:
1083 self._inspect_chunk(stored, is_last=False)
1084 try:
1085 yield stored
1086 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001087 raise isolated_format.MappingError(
1088 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001089 stored = chunk
1090 if stored is not None:
1091 self._inspect_chunk(stored, is_last=True)
1092 try:
1093 yield stored
1094 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001095 raise isolated_format.MappingError(
1096 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001097
1098 def _inspect_chunk(self, chunk, is_last):
1099 """Called for each fetched chunk before passing it to consumer."""
1100 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001101 self.rolling_hash.update(chunk)
1102 if not is_last:
1103 return
1104
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001105 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001106 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001107 msg = 'Incorrect file size: want %d, got %d' % (
1108 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001109 raise IOError(msg)
1110
1111 actual_digest = self.rolling_hash.hexdigest()
1112 if self.expected_digest != actual_digest:
1113 msg = 'Incorrect digest: want %s, got %s' % (
1114 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001115 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001116
1117
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001118class IsolatedBundle(object):
1119 """Fetched and parsed .isolated file with all dependencies."""
1120
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001121 def __init__(self, filter_cb):
1122 """
1123 filter_cb: callback function to filter downloaded content.
1124 When filter_cb is not None, Isolated file is downloaded iff
1125 filter_cb(filepath) returns True.
1126 """
1127
Vadim Shtayura3148e072014-09-02 18:51:52 -07001128 self.command = []
1129 self.files = {}
1130 self.read_only = None
1131 self.relative_cwd = None
1132 # The main .isolated file, a IsolatedFile instance.
1133 self.root = None
1134
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001135 self._filter_cb = filter_cb
1136
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001137 def fetch(self, fetch_queue, root_isolated_hash, algo):
1138 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001139
1140 It enables support for "included" .isolated files. They are processed in
1141 strict order but fetched asynchronously from the cache. This is important so
1142 that a file in an included .isolated file that is overridden by an embedding
1143 .isolated file is not fetched needlessly. The includes are fetched in one
1144 pass and the files are fetched as soon as all the ones on the left-side
1145 of the tree were fetched.
1146
1147 The prioritization is very important here for nested .isolated files.
1148 'includes' have the highest priority and the algorithm is optimized for both
1149 deep and wide trees. A deep one is a long link of .isolated files referenced
1150 one at a time by one item in 'includes'. A wide one has a large number of
1151 'includes' in a single .isolated file. 'left' is defined as an included
1152 .isolated file earlier in the 'includes' list. So the order of the elements
1153 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001154
1155 As a side effect this method starts asynchronous fetch of all data files
1156 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1157 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001158 """
1159 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1160
1161 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1162 pending = {}
1163 # Set of hashes of already retrieved items to refuse recursive includes.
1164 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001165 # Set of IsolatedFile's whose data files have already being fetched.
1166 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001167
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001168 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001169 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001170 h = isolated_file.obj_hash
1171 if h in seen:
1172 raise isolated_format.IsolatedError(
1173 'IsolatedFile %s is retrieved recursively' % h)
1174 assert h not in pending
1175 seen.add(h)
1176 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001177 # This isolated item is being added dynamically, notify FetchQueue.
1178 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001179 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1180
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001181 # Start fetching root *.isolated file (single file, not the whole bundle).
1182 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001183
1184 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001185 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001186 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001187 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001188 with fetch_queue.cache.getfileobj(item_hash) as f:
1189 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001190
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001191 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001192 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001193 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001194
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001195 # Always fetch *.isolated files in traversal order, waiting if necessary
1196 # until next to-be-processed node loads. "Waiting" is done by yielding
1197 # back to the outer loop, that waits until some *.isolated is loaded.
1198 for node in isolated_format.walk_includes(self.root):
1199 if node not in processed:
1200 # Not visited, and not yet loaded -> wait for it to load.
1201 if not node.is_loaded:
1202 break
1203 # Not visited and loaded -> process it and continue the traversal.
1204 self._start_fetching_files(node, fetch_queue)
1205 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001206
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001207 # All *.isolated files should be processed by now and only them.
1208 all_isolateds = set(isolated_format.walk_includes(self.root))
1209 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001210 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001211
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001212 # Extract 'command' and other bundle properties.
1213 for node in isolated_format.walk_includes(self.root):
1214 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001215 self.relative_cwd = self.relative_cwd or ''
1216
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001217 def _start_fetching_files(self, isolated, fetch_queue):
1218 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001219
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001220 Modifies self.files.
1221 """
maruel10bea7b2016-12-07 05:03:49 -08001222 files = isolated.data.get('files', {})
1223 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001224 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001225 if self._filter_cb and not self._filter_cb(filepath):
1226 continue
1227
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001228 # Root isolated has priority on the files being mapped. In particular,
1229 # overridden files must not be fetched.
1230 if filepath not in self.files:
1231 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001232
1233 # Make sure if the isolated is read only, the mode doesn't have write
1234 # bits.
1235 if 'm' in properties and self.read_only:
1236 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1237
1238 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001239 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001240 fetch_queue.add(
1241 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1242
1243 def _update_self(self, node):
1244 """Extracts bundle global parameters from loaded *.isolated file.
1245
1246 Will be called with each loaded *.isolated file in order of traversal of
1247 isolated include graph (see isolated_format.walk_includes).
1248 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001249 # Grabs properties.
1250 if not self.command and node.data.get('command'):
1251 # Ensure paths are correctly separated on windows.
1252 self.command = node.data['command']
1253 if self.command:
1254 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001255 if self.read_only is None and node.data.get('read_only') is not None:
1256 self.read_only = node.data['read_only']
1257 if (self.relative_cwd is None and
1258 node.data.get('relative_cwd') is not None):
1259 self.relative_cwd = node.data['relative_cwd']
1260
1261
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001262def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001263 """Returns Storage class that can upload and download from |namespace|.
1264
1265 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001266 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001267
1268 Returns:
1269 Instance of Storage.
1270 """
Lei Lei73a5f732020-03-23 20:36:14 +00001271 # Handle the specific internal use case.
1272 assert (isinstance(server_ref, isolate_storage.ServerRef) or
1273 type(server_ref).__name__ == 'ServerRef'), repr(server_ref)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001274 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001275
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001276
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001277def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1278 """Put downloaded file to destination path. This function is used for multi
1279 threaded file putting.
1280 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001281 with tools.Profiler("_map_file for %s" % dst):
1282 with cache.getfileobj(digest) as srcfileobj:
1283 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001284
Takuto Ikuta523c6472019-09-18 02:53:34 +00001285 if filetype == 'basic':
1286 # Ignore all bits apart from the user.
1287 file_mode = (props.get('m') or 0o500) & 0o700
1288 if read_only:
1289 # Enforce read-only if the root bundle does.
1290 file_mode &= 0o500
1291 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001292
Takuto Ikuta523c6472019-09-18 02:53:34 +00001293 elif filetype == 'tar':
1294 basedir = os.path.dirname(dst)
1295 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1296 ensured_dirs = set()
1297 for ti in t:
1298 if not ti.isfile():
1299 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1300 ti.type)
1301 continue
1302 # Handle files created on Windows fetched on POSIX and the
1303 # reverse.
1304 other_sep = '/' if os.path.sep == '\\' else '\\'
1305 name = ti.name.replace(other_sep, os.path.sep)
1306 fp = os.path.normpath(os.path.join(basedir, name))
1307 if not fp.startswith(basedir):
1308 logging.error('Path(%r) is outside root directory', fp)
1309 ifd = t.extractfile(ti)
1310 fp_dir = os.path.dirname(fp)
1311 if fp_dir not in ensured_dirs:
1312 file_path.ensure_tree(fp_dir)
1313 ensured_dirs.add(fp_dir)
1314 file_mode = ti.mode & 0o700
1315 if read_only:
1316 # Enforce read-only if the root bundle does.
1317 file_mode &= 0o500
1318 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001319
Takuto Ikuta523c6472019-09-18 02:53:34 +00001320 else:
1321 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001322
1323
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001324def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1325 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001326 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001327
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001328 Arguments:
1329 isolated_hash: hash of the root *.isolated file.
1330 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001331 cache: ContentAddressedCache class that knows how to store and map files
1332 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001333 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001334 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001335 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001336
1337 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001338 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001339 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001340 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001341 'fetch_isolated(%s, %s, %s, %s, %s)',
1342 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001343 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001344 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001345 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001346 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001347
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001348 with tools.Profiler('GetIsolateds'):
1349 # Optionally support local files by manually adding them to cache.
1350 if not isolated_format.is_valid_hash(isolated_hash, algo):
1351 logging.debug('%s is not a valid hash, assuming a file '
1352 '(algo was %s, hash size was %d)',
1353 isolated_hash, algo(), algo().digest_size)
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001354 path = six.text_type(os.path.abspath(isolated_hash))
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001355 try:
1356 isolated_hash = fetch_queue.inject_local_file(path, algo)
1357 except IOError as e:
1358 raise isolated_format.MappingError(
1359 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1360 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001361
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001362 # Load all *.isolated and start loading rest of the files.
1363 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001364
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001365 with tools.Profiler('GetRest'):
1366 # Create file system hierarchy.
1367 file_path.ensure_tree(outdir)
1368 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001369 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001370
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001371 # Ensure working directory exists.
1372 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1373 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001374
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001375 # Multimap: digest -> list of pairs (path, props).
1376 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001377 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001378 if 'h' in props:
1379 remaining.setdefault(props['h'], []).append((filepath, props))
1380 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001381
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001382 # Now block on the remaining files to be downloaded and mapped.
1383 logging.info('Retrieving remaining files (%d of them)...',
1384 fetch_queue.pending_count)
1385 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001386
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001387 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1388 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1389 while remaining:
1390 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001391
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001392 # Wait for any item to finish fetching to cache.
1393 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001394
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001395 # Create the files in the destination using item in cache as the
1396 # source.
1397 for filepath, props in remaining.pop(digest):
1398 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001399
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001400 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1401 _map_file, fullpath, digest,
1402 props, cache, bundle.read_only,
1403 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001404
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001405 # Report progress.
1406 duration = time.time() - last_update
1407 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1408 msg = '%d files remaining...' % len(remaining)
1409 sys.stdout.write(msg + '\n')
1410 sys.stdout.flush()
1411 logging.info(msg)
1412 last_update = time.time()
1413 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1414 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001415
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001416 # Save the cache right away to not loose the state of the new objects.
1417 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001418 # Cache could evict some items we just tried to fetch, it's a fatal error.
1419 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001420 free_disk = file_path.get_free_space(cache.cache_dir)
1421 msg = (
1422 'Cache is too small to hold all requested files.\n'
1423 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001424 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001425 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001426 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001427
1428
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001429def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001430 """Yields every file and/or symlink found.
1431
1432 Yields:
1433 tuple(FileItem, relpath, metadata)
1434 For a symlink, FileItem is None.
1435 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001436 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001437 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001438 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001439 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001440 root,
1441 u'.' + os.path.sep,
1442 blacklist,
1443 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001444
1445 filepath = os.path.join(root, relpath)
1446 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001447 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001448 meta = isolated_format.file_to_metadata(filepath, 0, False)
1449 yield None, relpath, meta
1450 continue
1451
1452 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001453 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1454 # The file was added to the current pending tarball and won't be archived
1455 # individually.
1456 continue
1457
1458 # Flush and reset the bundle.
1459 for i, p, m in bundle.yield_item_path_meta():
1460 yield i, p, m
1461 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001462
1463 # Yield the file individually.
1464 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1465 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001466
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001467 for i, p, m in bundle.yield_item_path_meta():
1468 yield i, p, m
1469
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001470
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001471def _print_upload_stats(items, missing):
1472 """Prints upload stats."""
1473 total = len(items)
1474 total_size = sum(f.size for f in items)
1475 logging.info(
1476 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1477 cache_hit = set(items).difference(missing)
1478 cache_hit_size = sum(f.size for f in cache_hit)
1479 logging.info(
1480 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1481 len(cache_hit),
1482 cache_hit_size / 1024.,
1483 len(cache_hit) * 100. / total,
1484 cache_hit_size * 100. / total_size if total_size else 0)
1485 cache_miss = missing
1486 cache_miss_size = sum(f.size for f in cache_miss)
1487 logging.info(
1488 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1489 len(cache_miss),
1490 cache_miss_size / 1024.,
1491 len(cache_miss) * 100. / total,
1492 cache_miss_size * 100. / total_size if total_size else 0)
1493
1494
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001495def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001496 """Called by archive_files_to_storage for a directory.
1497
1498 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001499
1500 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001501 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001502 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001503 files = {}
1504 for item, relpath, meta in _directory_to_metadata(
1505 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001506 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001507 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001508 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001509 yield item
1510
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001511 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001512 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001513 'algo': hash_algo_name,
1514 'files': files,
1515 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001516 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001517 # Keep the file in memory. This is fine because .isolated files are relatively
1518 # small.
1519 yield BufferItem(
1520 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001521
1522
Takuto Ikuta26980872020-04-09 06:56:37 +00001523def archive_files_to_storage(storage, files, blacklist, verify_push=False):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001524 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001525
1526 Arguments:
1527 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001528 files: iterable of files to upload. If a directory is specified (with a
1529 trailing slash), a .isolated file is created and its hash is returned.
1530 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001531 blacklist: function that returns True if a file should be omitted.
Takuto Ikuta26980872020-04-09 06:56:37 +00001532 verify_push: verify files are uploaded correctly by fetching from server.
maruel064c0a32016-04-05 11:47:15 -07001533
1534 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001535 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001536 The first file in the first item is always the .isolated file.
Ye Kuang4e994292020-04-10 07:07:35 +00001537
1538 Raises:
1539 Re-raises the exception in upload_items(), if there is any.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001540 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001541 # Dict of path to hash.
1542 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001543 hash_algo = storage.server_ref.hash_algo
1544 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001545 # Generator of FileItem to pass to upload_items() concurrent operation.
1546 channel = threading_utils.TaskChannel()
Ye Kuang4e994292020-04-10 07:07:35 +00001547 exc_channel = threading_utils.TaskChannel()
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001548 uploaded_digests = set()
Ye Kuang4e994292020-04-10 07:07:35 +00001549
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001550 def _upload_items():
Ye Kuang4e994292020-04-10 07:07:35 +00001551 try:
1552 results = storage.upload_items(channel, verify_push)
1553 uploaded_digests.update(f.digest for f in results)
1554 except Exception:
1555 exc_channel.send_exception()
1556
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001557 t = threading.Thread(target=_upload_items)
1558 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001559
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001560 # Keep track locally of the items to determine cold and hot items.
1561 items_found = []
1562 try:
1563 for f in files:
Takuto Ikuta95459dd2019-10-29 12:39:47 +00001564 assert isinstance(f, six.text_type), repr(f)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001565 if f in results:
1566 # Duplicate
1567 continue
1568 try:
1569 filepath = os.path.abspath(f)
1570 if fs.isdir(filepath):
1571 # Uploading a whole directory.
1572 item = None
1573 for item in _enqueue_dir(
1574 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001575 channel.send_result(item)
1576 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001577 # The very last item will be the .isolated file.
1578 if not item:
1579 # There was no file in the directory.
1580 continue
1581 elif fs.isfile(filepath):
1582 item = FileItem(
1583 path=filepath,
1584 algo=hash_algo,
1585 size=None,
1586 high_priority=f.endswith('.isolated'))
1587 channel.send_result(item)
1588 items_found.append(item)
1589 else:
1590 raise Error('%s is neither a file or directory.' % f)
1591 results[f] = item.digest
1592 except OSError:
1593 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001594 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001595 # Stops the generator, so _upload_items() can exit.
1596 channel.send_done()
1597 t.join()
Ye Kuang4e994292020-04-10 07:07:35 +00001598 exc_channel.send_done()
Takuto Ikuta640fcbf2020-04-23 00:12:46 +00001599
1600 try:
1601 for _ in exc_channel:
1602 pass
1603 except Exception:
1604 # log items when failed to upload files.
1605 for item in items_found:
1606 if isinstance(item, FileItem):
1607 logging.error('FileItem path: %s, digest:%s, re-calculated digest:%s',
1608 item.path, item.digest,
1609 isolated_format.hash_file(item.path, item.algo))
1610 continue
1611
1612 logging.error('Item digest:%s', item.digest)
1613
1614 raise
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001615
1616 cold = []
1617 hot = []
1618 for i in items_found:
1619 # Note that multiple FileItem may have the same .digest.
1620 if i.digest in uploaded_digests:
1621 cold.append(i)
1622 else:
1623 hot.append(i)
1624 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001625
1626
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001627@subcommand.usage('<file1..fileN> or - to read from stdin')
1628def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001629 """Archives data to the server.
1630
1631 If a directory is specified, a .isolated file is created the whole directory
1632 is uploaded. Then this .isolated file can be included in another one to run
1633 commands.
1634
1635 The commands output each file that was processed with its content hash. For
1636 directories, the .isolated generated for the directory is listed as the
1637 directory entry itself.
1638 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001639 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001640 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001641 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001642 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001643 server_ref = isolate_storage.ServerRef(
1644 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001645 if files == ['-']:
1646 files = (l.rstrip('\n\r') for l in sys.stdin)
1647 if not files:
1648 parser.error('Nothing to upload')
1649 files = (f.decode('utf-8') for f in files)
1650 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001651 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001652 with get_storage(server_ref) as storage:
1653 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001654 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001655 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001656 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001657 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001658
1659
1660def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001661 """Download data from the server.
1662
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001663 It can either download individual files or a complete tree from a .isolated
1664 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001665 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001666 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001667 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001668 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001669 help='hash of an isolated file, .isolated file content is discarded, use '
1670 '--file if you need it')
1671 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001672 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1673 help='hash and destination of a file, can be used multiple times')
1674 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001675 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001676 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001677 parser.add_option(
1678 '--use-symlinks', action='store_true',
1679 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001680 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001681 options, args = parser.parse_args(args)
1682 if args:
1683 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001684 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001685 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001686
nodir55be77b2016-05-03 09:39:57 -07001687 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001688 if bool(options.isolated) == bool(options.file):
1689 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001690 if not options.cache and options.use_symlinks:
1691 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001692
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001693 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001694 cache.cleanup()
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001695 options.target = six.text_type(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001696 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001697 if (fs.isfile(options.target) or
1698 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001699 parser.error(
1700 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001701 server_ref = isolate_storage.ServerRef(
1702 options.isolate_server, options.namespace)
1703 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001704 # Fetching individual files.
1705 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001706 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001707 channel = threading_utils.TaskChannel()
1708 pending = {}
1709 for digest, dest in options.file:
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001710 dest = six.text_type(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001711 pending[digest] = dest
1712 storage.async_fetch(
1713 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001714 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001715 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001716 local_caching.UNKNOWN_FILE_SIZE,
1717 functools.partial(
1718 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001719 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001720 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001721 dest = pending.pop(fetched)
1722 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001723
Vadim Shtayura3172be52013-12-03 12:49:05 -08001724 # Fetching whole isolated tree.
1725 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001726 bundle = fetch_isolated(
1727 isolated_hash=options.isolated,
1728 storage=storage,
1729 cache=cache,
1730 outdir=options.target,
1731 use_symlinks=options.use_symlinks)
1732 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001733 if bundle.command:
1734 rel = os.path.join(options.target, bundle.relative_cwd)
1735 print('To run this test please run from the directory %s:' %
1736 os.path.join(options.target, rel))
1737 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001738
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001739 return 0
1740
1741
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001742def add_archive_options(parser):
1743 parser.add_option(
1744 '--blacklist',
1745 action='append', default=list(DEFAULT_BLACKLIST),
1746 help='List of regexp to use as blacklist filter when uploading '
1747 'directories')
1748
1749
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001750def add_isolate_server_options(parser):
1751 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001752 parser.add_option(
1753 '-I', '--isolate-server',
1754 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001755 help='URL of the Isolate Server to use. Defaults to the environment '
1756 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1757 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001758 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001759 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001760 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001761 '--namespace', default='default-gzip',
1762 help='The namespace to use on the Isolate Server, default: %default')
1763
1764
nodir55be77b2016-05-03 09:39:57 -07001765def process_isolate_server_options(
1766 parser, options, set_exception_handler, required):
1767 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001768
1769 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001770 """
1771 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001772 if required:
1773 parser.error('--isolate-server is required.')
1774 return
1775
aludwind7b7b7e2017-06-29 16:38:50 -07001776 if options.grpc_proxy:
1777 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001778 else:
1779 try:
1780 options.isolate_server = net.fix_url(options.isolate_server)
1781 except ValueError as e:
1782 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001783 if set_exception_handler:
1784 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001785 try:
1786 return auth.ensure_logged_in(options.isolate_server)
1787 except ValueError as e:
1788 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001789 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001790
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001791
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001792def add_cache_options(parser):
1793 cache_group = optparse.OptionGroup(parser, 'Cache management')
1794 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001795 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001796 help='Directory to keep a local cache of the files. Accelerates download '
1797 'by reusing already downloaded files. Default=%default')
1798 cache_group.add_option(
1799 '--max-cache-size',
1800 type='int',
1801 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001802 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001803 help='Trim if the cache gets larger than this value, default=%default')
1804 cache_group.add_option(
1805 '--min-free-space',
1806 type='int',
1807 metavar='NNN',
1808 default=2*1024*1024*1024,
1809 help='Trim if disk free space becomes lower than this value, '
1810 'default=%default')
1811 cache_group.add_option(
1812 '--max-items',
1813 type='int',
1814 metavar='NNN',
1815 default=100000,
1816 help='Trim if more than this number of items are in the cache '
1817 'default=%default')
1818 parser.add_option_group(cache_group)
1819
1820
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001821def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001822 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001823 policies = local_caching.CachePolicies(
1824 options.max_cache_size,
1825 options.min_free_space,
1826 options.max_items,
1827 # 3 weeks.
1828 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001829
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001830 # |options.cache| path may not exist until DiskContentAddressedCache()
1831 # instance is created.
1832 return local_caching.DiskContentAddressedCache(
Takuto Ikuta6e2ff962019-10-29 12:35:27 +00001833 six.text_type(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001834 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001835
1836
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001837class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001838 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001839 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001840 self,
1841 version=__version__,
1842 prog=os.path.basename(sys.modules[__name__].__file__),
1843 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001844 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001845
1846 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001847 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001848 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001849 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001850 return options, args
1851
1852
1853def main(args):
1854 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001855 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001856
1857
1858if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001859 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001860 fix_encoding.fix_encoding()
1861 tools.disable_buffering()
1862 colorama.init()
Takuto Ikuta160b4452020-04-15 06:33:55 +00001863 net.set_user_agent('isolateserver.py/' + __version__)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001864 sys.exit(main(sys.argv[1:]))