blob: 1b2324919abf40cd1beee28c738216775b5a9499 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000021import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000025from utils import tools
26tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000028# third_party/
29import colorama
30from depot_tools import fix_encoding
31from depot_tools import subcommand
Lei Leife202df2019-06-11 17:33:34 +000032from six.moves import queue as Queue
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000033
34# pylint: disable=ungrouped-imports
35import auth
36import isolated_format
37import isolate_storage
38import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050039from utils import file_path
maruel12e30012015-10-09 11:55:35 -070040from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040041from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000042from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040043from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070044from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000045from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080046
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# Version of isolate protocol passed to the server in /handshake request.
49ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000050
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000051
Vadim Shtayura3148e072014-09-02 18:51:52 -070052# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
maruel@chromium.org41601642013-09-18 19:40:46 +000080# The delay (in seconds) to wait between logging statements when retrieving
81# the required files. This is intended to let the user (or buildbot) know that
82# the program is still running.
83DELAY_BETWEEN_UPDATES_IN_SECS = 30
84
85
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050086DEFAULT_BLACKLIST = (
87 # Temporary vim or python files.
88 r'^.+\.(?:pyc|swp)$',
89 # .git or .svn directory.
90 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
91)
92
93
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050094class Error(Exception):
95 """Generic runtime error."""
96 pass
97
98
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040099class Aborted(Error):
100 """Operation aborted."""
101 pass
102
103
nodir90bc8dc2016-06-15 13:35:21 -0700104class AlreadyExists(Error):
105 """File already exists."""
106
107
maruel12e30012015-10-09 11:55:35 -0700108def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800109 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700110 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 if offset:
112 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000113 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000114 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 if not data:
116 break
117 yield data
118
119
tansell9e04a8d2016-07-28 09:31:59 -0700120def fileobj_path(fileobj):
121 """Return file system path for file like object or None.
122
123 The returned path is guaranteed to exist and can be passed to file system
124 operations like copy.
125 """
126 name = getattr(fileobj, 'name', None)
127 if name is None:
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000128 return None
tansell9e04a8d2016-07-28 09:31:59 -0700129
130 # If the file like object was created using something like open("test.txt")
131 # name will end up being a str (such as a function outside our control, like
132 # the standard library). We want all our paths to be unicode objects, so we
133 # decode it.
134 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500135 # We incorrectly assume that UTF-8 is used everywhere.
136 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700137
tansell26de79e2016-11-13 18:41:11 -0800138 # fs.exists requires an absolute path, otherwise it will fail with an
139 # assertion error.
140 if not os.path.isabs(name):
Takuto Ikuta523c6472019-09-18 02:53:34 +0000141 return None
tansell26de79e2016-11-13 18:41:11 -0800142
tansell9e04a8d2016-07-28 09:31:59 -0700143 if fs.exists(name):
144 return name
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +0000145 return None
tansell9e04a8d2016-07-28 09:31:59 -0700146
147
148# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
149# wrappers have been created.
150def fileobj_copy(
151 dstfileobj, srcfileobj, size=-1,
152 chunk_size=isolated_format.DISK_FILE_CHUNK):
153 """Copy data from srcfileobj to dstfileobj.
154
155 Providing size means exactly that amount of data will be copied (if there
156 isn't enough data, an IOError exception is thrown). Otherwise all data until
157 the EOF marker will be copied.
158 """
159 if size == -1 and hasattr(srcfileobj, 'tell'):
160 if srcfileobj.tell() != 0:
161 raise IOError('partial file but not using size')
162
163 written = 0
164 while written != size:
165 readsize = chunk_size
166 if size > 0:
167 readsize = min(readsize, size-written)
168 data = srcfileobj.read(readsize)
169 if not data:
170 if size == -1:
171 break
172 raise IOError('partial file, got %s, wanted %s' % (written, size))
173 dstfileobj.write(data)
174 written += len(data)
175
176
177def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
178 """Put srcfileobj at the given dstpath with given mode.
179
180 The function aims to do this as efficiently as possible while still allowing
181 any possible file like object be given.
182
183 Creating a tree of hardlinks has a few drawbacks:
184 - tmpfs cannot be used for the scratch space. The tree has to be on the same
185 partition as the cache.
186 - involves a write to the inode, which advances ctime, cause a metadata
187 writeback (causing disk seeking).
188 - cache ctime cannot be used to detect modifications / corruption.
189 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
190 partition. This is why the function automatically fallbacks to copying the
191 file content.
192 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
193 same owner is for all hardlinks.
194 - Anecdotal report that ext2 is known to be potentially faulty on high rate
195 of hardlink creation.
196
197 Creating a tree of symlinks has a few drawbacks:
198 - Tasks running the equivalent of os.path.realpath() will get the naked path
199 and may fail.
200 - Windows:
201 - Symlinks are reparse points:
202 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
203 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
204 - Symbolic links are Win32 paths, not NT paths.
205 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
206 - Symbolic links are supported on Windows 7 and later only.
207 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
208 default.
209 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
210 RID is present in the token;
211 https://msdn.microsoft.com/en-us/library/bb530410.aspx
212 """
213 srcpath = fileobj_path(srcfileobj)
214 if srcpath and size == -1:
215 readonly = file_mode is None or (
216 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
217
218 if readonly:
219 # If the file is read only we can link the file
220 if use_symlink:
221 link_mode = file_path.SYMLINK_WITH_FALLBACK
222 else:
223 link_mode = file_path.HARDLINK_WITH_FALLBACK
224 else:
225 # If not read only, we must copy the file
226 link_mode = file_path.COPY
227
228 file_path.link_file(dstpath, srcpath, link_mode)
Takuto Ikuta523c6472019-09-18 02:53:34 +0000229 assert fs.exists(dstpath)
tansell9e04a8d2016-07-28 09:31:59 -0700230 else:
231 # Need to write out the file
232 with fs.open(dstpath, 'wb') as dstfileobj:
233 fileobj_copy(dstfileobj, srcfileobj, size)
234
Takuto Ikuta523c6472019-09-18 02:53:34 +0000235 if sys.platform == 'win32' and file_mode and file_mode & stat.S_IWRITE:
236 # On windows, mode other than removing stat.S_IWRITE is ignored. Returns
237 # early to skip slow/unnecessary chmod call.
238 return
tansell9e04a8d2016-07-28 09:31:59 -0700239
240 # file_mode of 0 is actually valid, so need explicit check.
241 if file_mode is not None:
242 fs.chmod(dstpath, file_mode)
243
244
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000245def zip_compress(content_generator, level=7):
246 """Reads chunks from |content_generator| and yields zip compressed chunks."""
247 compressor = zlib.compressobj(level)
248 for chunk in content_generator:
249 compressed = compressor.compress(chunk)
250 if compressed:
251 yield compressed
252 tail = compressor.flush(zlib.Z_FINISH)
253 if tail:
254 yield tail
255
256
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400257def zip_decompress(
258 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000259 """Reads zipped data from |content_generator| and yields decompressed data.
260
261 Decompresses data in small chunks (no larger than |chunk_size|) so that
262 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
263
264 Raises IOError if data is corrupted or incomplete.
265 """
266 decompressor = zlib.decompressobj()
267 compressed_size = 0
268 try:
269 for chunk in content_generator:
270 compressed_size += len(chunk)
271 data = decompressor.decompress(chunk, chunk_size)
272 if data:
273 yield data
274 while decompressor.unconsumed_tail:
275 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
276 if data:
277 yield data
278 tail = decompressor.flush()
279 if tail:
280 yield tail
281 except zlib.error as e:
282 raise IOError(
283 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
284 # Ensure all data was read and decompressed.
285 if decompressor.unused_data or decompressor.unconsumed_tail:
286 raise IOError('Not all data was decompressed')
287
288
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000289def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000290 """Given a filename calculates the ideal zip compression level to use."""
291 file_ext = os.path.splitext(filename)[1].lower()
292 # TODO(csharp): Profile to find what compression level works best.
293 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
294
295
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000296def create_directories(base_directory, files):
297 """Creates the directory structure needed by the given list of files."""
298 logging.debug('create_directories(%s, %d)', base_directory, len(files))
299 # Creates the tree of directories to create.
300 directories = set(os.path.dirname(f) for f in files)
301 for item in list(directories):
302 while item:
303 directories.add(item)
304 item = os.path.dirname(item)
305 for d in sorted(directories):
306 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700307 abs_d = os.path.join(base_directory, d)
308 if not fs.isdir(abs_d):
309 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000310
311
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000312def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500313 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000314 for filepath, properties in files:
315 if 'l' not in properties:
316 continue
317 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500318 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000319 logging.warning('Ignoring symlink %s', filepath)
320 continue
321 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700322 try:
323 os.symlink(properties['l'], outfile) # pylint: disable=E1101
324 except OSError as e:
325 if e.errno == errno.EEXIST:
326 raise AlreadyExists('File %s already exists.' % outfile)
327 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000328
329
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000330class _ThreadFile(object):
331 """Multithreaded fake file. Used by TarBundle."""
332 def __init__(self):
333 self._data = threading_utils.TaskChannel()
334 self._offset = 0
335
336 def __iter__(self):
337 return self._data
338
339 def tell(self):
340 return self._offset
341
342 def write(self, b):
343 self._data.send_result(b)
344 self._offset += len(b)
345
346 def close(self):
347 self._data.send_done()
348
349
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400350class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800351 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000352
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800353 Its digest and size may be provided in advance, if known. Otherwise they will
354 be derived from the file content.
355 """
356
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000357 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800358 super(FileItem, self).__init__(
359 digest,
maruel12e30012015-10-09 11:55:35 -0700360 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000361 high_priority,
362 compression_level=_get_zip_compression_level(path))
363 self._path = path
364 self._algo = algo
365 self._meta = None
366
367 @property
368 def path(self):
369 return self._path
370
371 @property
372 def digest(self):
373 if not self._digest:
374 self._digest = isolated_format.hash_file(self._path, self._algo)
375 return self._digest
376
377 @property
378 def meta(self):
379 if not self._meta:
380 # TODO(maruel): Inline.
381 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
382 # We need to hash right away.
383 self._meta['h'] = self.digest
384 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000385
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800386 def content(self):
387 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000388
389
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000390class TarBundle(isolate_storage.Item):
391 """Tarfile to push to Storage.
392
393 Its digest is the digest of all the files it contains. It is generated on the
394 fly.
395 """
396
397 def __init__(self, root, algo):
398 # 2 trailing 512 bytes headers.
399 super(TarBundle, self).__init__(size=1024)
400 self._items = []
401 self._meta = None
402 self._algo = algo
403 self._root_len = len(root) + 1
404 # Same value as for Go.
405 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
406 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
407 self._archive_max_size = int(10e6)
408
409 @property
410 def digest(self):
411 if not self._digest:
412 self._prepare()
413 return self._digest
414
415 @property
416 def size(self):
417 if self._size is None:
418 self._prepare()
419 return self._size
420
421 def try_add(self, item):
422 """Try to add this file to the bundle.
423
424 It is extremely naive but this should be just enough for
425 https://crbug.com/825418.
426
427 Future improvements should be in the Go code, and the Swarming bot should be
428 migrated to use the Go code instead.
429 """
430 if not item.size:
431 return False
432 # pylint: disable=unreachable
433 rounded = (item.size + 512) & ~511
434 if rounded + self._size > self._archive_max_size:
435 return False
436 # https://crbug.com/825418
437 return False
438 self._size += rounded
439 self._items.append(item)
440 return True
441
442 def yield_item_path_meta(self):
443 """Returns a tuple(Item, filepath, meta_dict).
444
445 If the bundle contains less than 5 items, the items are yielded.
446 """
447 if len(self._items) < 5:
448 # The tarball is too small, yield individual items, if any.
449 for item in self._items:
450 yield item, item.path[self._root_len:], item.meta
451 else:
452 # This ensures self._meta is set.
453 p = self.digest + '.tar'
454 # Yield itself as a tarball.
455 yield self, p, self._meta
456
457 def content(self):
458 """Generates the tarfile content on the fly."""
459 obj = _ThreadFile()
460 def _tar_thread():
461 try:
462 t = tarfile.open(
463 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
464 for item in self._items:
465 logging.info(' tarring %s', item.path)
466 t.add(item.path)
467 t.close()
468 except Exception:
469 logging.exception('Internal failure')
470 finally:
471 obj.close()
472
473 t = threading.Thread(target=_tar_thread)
474 t.start()
475 try:
476 for data in obj:
477 yield data
478 finally:
479 t.join()
480
481 def _prepare(self):
482 h = self._algo()
483 total = 0
484 for chunk in self.content():
485 h.update(chunk)
486 total += len(chunk)
487 # pylint: disable=attribute-defined-outside-init
488 # This is not true, they are defined in Item.__init__().
489 self._digest = h.hexdigest()
490 self._size = total
491 self._meta = {
492 'h': self.digest,
493 's': self.size,
494 't': u'tar',
495 }
496
497
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400498class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000499 """A byte buffer to push to Storage."""
500
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000501 def __init__(self, buf, algo, high_priority=False):
502 super(BufferItem, self).__init__(
503 digest=algo(buf).hexdigest(),
504 size=len(buf),
505 high_priority=high_priority)
506 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000507
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800508 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000509 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000510
511
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000512class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000514
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800515 Implements compression support, parallel 'contains' checks, parallel uploads
516 and more.
517
518 Works only within single namespace (and thus hashing algorithm and compression
519 scheme are fixed).
520
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400521 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
522 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800523 """
524
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700525 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000526 self._storage_api = storage_api
527 self._cpu_thread_pool = None
528 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400529 self._aborted = False
530 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000531
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000532 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000533 def server_ref(self):
534 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700535
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000536 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700537 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000538 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700539
540 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000541 def cpu_thread_pool(self):
542 """ThreadPool for CPU-bound tasks like zipping."""
543 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500544 threads = max(threading_utils.num_processors(), 2)
Lei Leife202df2019-06-11 17:33:34 +0000545 max_size = long(2)**32 if sys.version_info.major == 2 else 2**32
546 if sys.maxsize <= max_size:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500547 # On 32 bits userland, do not try to use more than 16 threads.
548 threads = min(threads, 16)
549 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000550 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000551
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000552 @property
553 def net_thread_pool(self):
554 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
555 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700556 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000557 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000558
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000559 def close(self):
560 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400561 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000562 if self._cpu_thread_pool:
563 self._cpu_thread_pool.join()
564 self._cpu_thread_pool.close()
565 self._cpu_thread_pool = None
566 if self._net_thread_pool:
567 self._net_thread_pool.join()
568 self._net_thread_pool.close()
569 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400570 logging.info('Done.')
571
572 def abort(self):
573 """Cancels any pending or future operations."""
574 # This is not strictly theadsafe, but in the worst case the logging message
575 # will be printed twice. Not a big deal. In other places it is assumed that
576 # unprotected reads and writes to _aborted are serializable (it is true
577 # for python) and thus no locking is used.
578 if not self._aborted:
579 logging.warning('Aborting... It can take a while.')
580 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000581
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000582 def __enter__(self):
583 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400584 assert not self._prev_sig_handlers, self._prev_sig_handlers
585 for s in (signal.SIGINT, signal.SIGTERM):
586 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000587 return self
588
589 def __exit__(self, _exc_type, _exc_value, _traceback):
590 """Context manager interface."""
591 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400592 while self._prev_sig_handlers:
593 s, h = self._prev_sig_handlers.popitem()
594 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000595 return False
596
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000597 def upload_items(self, items):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000598 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000599
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800600 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000601
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000602 It uses 3 threads internally:
603 - One to create batches based on a timeout
604 - One to dispatch the /contains RPC and field the missing entries
605 - One to field the /push RPC
606
607 The main threads enumerates 'items' and pushes to the first thread. Then it
608 join() all the threads, waiting for them to complete.
609
610 (enumerate items of Item, this can be slow as disk is traversed)
611 |
612 v
613 _create_items_batches_thread Thread #1
614 (generates list(Item), every 3s or 20~100 items)
615 |
616 v
617 _do_lookups_thread Thread #2
618 | |
619 v v
620 (missing) (was on server)
621 |
622 v
623 _handle_missing_thread Thread #3
624 |
625 v
626 (upload Item, append to uploaded)
627
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000628 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400629 items: list of isolate_storage.Item instances that represents data to
630 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000631
632 Returns:
633 List of items that were uploaded. All other items are already there.
634 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000635 incoming = Queue.Queue()
636 batches_to_lookup = Queue.Queue()
637 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000638 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000640 def _create_items_batches_thread():
641 """Creates batches for /contains RPC lookup from individual items.
642
643 Input: incoming
644 Output: batches_to_lookup
645 """
646 try:
647 batch_size_index = 0
648 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
649 batch = []
650 while not self._aborted:
651 try:
652 item = incoming.get(True, timeout=3)
653 if item:
654 batch.append(item)
655 except Queue.Empty:
656 item = False
657 if len(batch) == batch_size or (not item and batch):
658 if len(batch) == batch_size:
659 batch_size_index += 1
660 batch_size = ITEMS_PER_CONTAINS_QUERIES[
661 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
662 batches_to_lookup.put(batch)
663 batch = []
664 if item is None:
665 break
666 finally:
667 # Unblock the next pipeline.
668 batches_to_lookup.put(None)
669
670 def _do_lookups_thread():
671 """Enqueues all the /contains RPCs and emits the missing items.
672
673 Input: batches_to_lookup
674 Output: missing, to_upload
675 """
676 try:
677 channel = threading_utils.TaskChannel()
678 def _contains(b):
679 if self._aborted:
680 raise Aborted()
681 return self._storage_api.contains(b)
682
683 pending_contains = 0
684 while not self._aborted:
685 batch = batches_to_lookup.get()
686 if batch is None:
687 break
688 self.net_thread_pool.add_task_with_channel(
689 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
690 pending_contains += 1
691 while pending_contains and not self._aborted:
692 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000693 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000694 except threading_utils.TaskChannel.Timeout:
695 break
696 pending_contains -= 1
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000697 for missing_item, push_state in v.items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000698 missing.put((missing_item, push_state))
699 while pending_contains and not self._aborted:
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +0000700 for missing_item, push_state in channel.next().items():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000701 missing.put((missing_item, push_state))
702 pending_contains -= 1
703 finally:
704 # Unblock the next pipeline.
705 missing.put((None, None))
706
707 def _handle_missing_thread():
708 """Sends the missing items to the uploader.
709
710 Input: missing
711 Output: uploaded
712 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700713 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000714 channel = threading_utils.TaskChannel()
715 pending_upload = 0
716 while not self._aborted:
717 try:
718 missing_item, push_state = missing.get(True, timeout=5)
719 if missing_item is None:
720 break
721 self._async_push(channel, missing_item, push_state)
722 pending_upload += 1
723 except Queue.Empty:
724 pass
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000725 detector.ping()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000726 while not self._aborted and pending_upload:
727 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000728 item = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000729 except threading_utils.TaskChannel.Timeout:
730 break
731 uploaded.append(item)
732 pending_upload -= 1
733 logging.debug(
734 'Uploaded %d; %d pending: %s (%d)',
735 len(uploaded), pending_upload, item.digest, item.size)
736 while not self._aborted and pending_upload:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000737 item = channel.next()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000738 uploaded.append(item)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000739 pending_upload -= 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000740 logging.debug(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000741 'Uploaded %d; %d pending: %s (%d)',
742 len(uploaded), pending_upload, item.digest, item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000743
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000744 threads = [
745 threading.Thread(target=_create_items_batches_thread),
746 threading.Thread(target=_do_lookups_thread),
747 threading.Thread(target=_handle_missing_thread),
748 ]
749 for t in threads:
750 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000751
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000752 try:
753 # For each digest keep only first isolate_storage.Item that matches it.
754 # All other items are just indistinguishable copies from the point of view
755 # of isolate server (it doesn't care about paths at all, only content and
756 # digests).
757 seen = {}
758 try:
759 # TODO(maruel): Reorder the items as a priority queue, with larger items
760 # being processed first. This is, before hashing the data.
761 # This must be done in the primary thread since items can be a
762 # generator.
763 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000764 if seen.setdefault(item.digest, item) is item:
765 incoming.put(item)
766 finally:
767 incoming.put(None)
768 finally:
769 for t in threads:
770 t.join()
771
772 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000773 if seen:
774 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000775 return uploaded
776
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000777 def _async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000778 """Starts asynchronous push to the server in a parallel thread.
779
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000780 Can be used only after |item| was checked for presence on a server with a
781 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800782
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000783 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000784 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400785 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000786 push_state: push state returned by storage_api.contains(). It contains
787 storage specific information describing how to upload the item (for
788 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800789
790 Returns:
791 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000792 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800793 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400794 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700795 threading_utils.PRIORITY_HIGH if item.high_priority
796 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800797
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000798 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400799 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400800 if self._aborted:
801 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800802 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000803 return item
804
Wei Huang1a38fbe2017-11-28 22:55:22 -0500805 # If zipping is not required, just start a push task. Don't pass 'content'
806 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000807 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000808 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000809 return
810
811 # If zipping is enabled, zip in a separate thread.
812 def zip_and_push():
813 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
814 # content right here. It will block until all file is zipped.
815 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400816 if self._aborted:
817 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800818 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000819 data = ''.join(stream)
820 except Exception as exc:
821 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800822 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000823 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500824 # Pass '[data]' explicitly because the compressed data is not same as the
825 # one provided by 'item'. Since '[data]' is a list, it can safely be
826 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000827 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000828 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000829 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000830
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800831 def push(self, item, push_state):
832 """Synchronously pushes a single item to the server.
833
834 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000835 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800836
837 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400838 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000839 push_state: push state returned by storage_api.contains(). It contains
840 storage specific information describing how to upload the item (for
841 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800842
843 Returns:
844 Pushed item (same object as |item|).
845 """
846 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700847 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000848 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000849 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800850 assert pushed is item
851 return item
852
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000853 def async_fetch(self, channel, priority, digest, size, sink):
854 """Starts asynchronous fetch from the server in a parallel thread.
855
856 Arguments:
857 channel: TaskChannel that receives back |digest| when download ends.
858 priority: thread pool task priority for the fetch.
859 digest: hex digest of an item to download.
860 size: expected size of the item (after decompression).
861 sink: function that will be called as sink(generator).
862 """
863 def fetch():
864 try:
865 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400866 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000867 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400868 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000869 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000870 verifier = FetchStreamVerifier(
871 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000872 # Verified stream goes to |sink|.
873 sink(verifier.run())
874 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800875 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000876 raise
877 return digest
878
879 # Don't bother with zip_thread_pool for decompression. Decompression is
880 # really fast and most probably IO bound anyway.
881 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
882
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000883
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000884class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400885 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000886
887 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400888 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
889 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000890 """
891
892 def __init__(self, storage, cache):
893 self.storage = storage
894 self.cache = cache
895 self._channel = threading_utils.TaskChannel()
896 self._pending = set()
897 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000898 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400899 # Pending digests that the caller waits for, see wait_on()/wait().
900 self._waiting_on = set()
901 # Already fetched digests the caller waits for which are not yet returned by
902 # wait().
903 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000904
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400905 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700906 self,
907 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400908 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700909 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000910 """Starts asynchronous fetch of item |digest|."""
911 # Fetching it now?
912 if digest in self._pending:
913 return
914
915 # Mark this file as in use, verify_all_cached will later ensure it is still
916 # in cache.
917 self._accessed.add(digest)
918
919 # Already fetched? Notify cache to update item's LRU position.
920 if digest in self._fetched:
921 # 'touch' returns True if item is in cache and not corrupted.
922 if self.cache.touch(digest, size):
923 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000924 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000925 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000926
927 # TODO(maruel): It should look at the free disk space, the current cache
928 # size and the size of the new item on every new item:
929 # - Trim the cache as more entries are listed when free disk space is low,
930 # otherwise if the amount of data downloaded during the run > free disk
931 # space, it'll crash.
932 # - Make sure there's enough free disk space to fit all dependencies of
933 # this run! If not, abort early.
934
935 # Start fetching.
936 self._pending.add(digest)
937 self.storage.async_fetch(
938 self._channel, priority, digest, size,
939 functools.partial(self.cache.write, digest))
940
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400941 def wait_on(self, digest):
942 """Updates digests to be waited on by 'wait'."""
943 # Calculate once the already fetched items. These will be retrieved first.
944 if digest in self._fetched:
945 self._waiting_on_ready.add(digest)
946 else:
947 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000948
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400949 def wait(self):
950 """Waits until any of waited-on items is retrieved.
951
952 Once this happens, it is remove from the waited-on set and returned.
953
954 This function is called in two waves. The first wave it is done for HIGH
955 priority items, the isolated files themselves. The second wave it is called
956 for all the files.
957
958 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000959 """
960 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400961 if self._waiting_on_ready:
962 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000963
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400964 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000965
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400966 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000967 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000968 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000969 self._pending.remove(digest)
970 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400971 if digest in self._waiting_on:
972 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000973 return digest
974
975 # Should never reach this point due to assert above.
976 raise RuntimeError('Impossible state')
977
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400978 @property
979 def wait_queue_empty(self):
980 """Returns True if there is no digest left for wait() to return."""
981 return not self._waiting_on and not self._waiting_on_ready
982
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000983 def inject_local_file(self, path, algo):
984 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700985 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000986 data = f.read()
987 digest = algo(data).hexdigest()
988 self.cache.write(digest, [data])
989 self._fetched.add(digest)
990 return digest
991
992 @property
993 def pending_count(self):
994 """Returns number of items to be fetched."""
995 return len(self._pending)
996
997 def verify_all_cached(self):
998 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000999 # Not thread safe, but called after all work is done.
1000 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001001
1002
1003class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001004 """Verifies that fetched file is valid before passing it to the
1005 ContentAddressedCache.
1006 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001007
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001008 def __init__(self, stream, hasher, expected_digest, expected_size):
1009 """Initializes the verifier.
1010
1011 Arguments:
1012 * stream: an iterable yielding chunks of content
1013 * hasher: an object from hashlib that supports update() and hexdigest()
1014 (eg, hashlib.sha1).
1015 * expected_digest: if the entire stream is piped through hasher and then
1016 summarized via hexdigest(), this should be the result. That is, it
1017 should be a hex string like 'abc123'.
1018 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001019 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001020 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001021 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001022 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001023 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001024 self.expected_size = expected_size
1025 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001026 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001027
1028 def run(self):
1029 """Generator that yields same items as |stream|.
1030
1031 Verifies |stream| is complete before yielding a last chunk to consumer.
1032
1033 Also wraps IOError produced by consumer into MappingError exceptions since
1034 otherwise Storage will retry fetch on unrelated local cache errors.
1035 """
1036 # Read one chunk ahead, keep it in |stored|.
1037 # That way a complete stream can be verified before pushing last chunk
1038 # to consumer.
1039 stored = None
1040 for chunk in self.stream:
1041 assert chunk is not None
1042 if stored is not None:
1043 self._inspect_chunk(stored, is_last=False)
1044 try:
1045 yield stored
1046 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001047 raise isolated_format.MappingError(
1048 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001049 stored = chunk
1050 if stored is not None:
1051 self._inspect_chunk(stored, is_last=True)
1052 try:
1053 yield stored
1054 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001055 raise isolated_format.MappingError(
1056 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001057
1058 def _inspect_chunk(self, chunk, is_last):
1059 """Called for each fetched chunk before passing it to consumer."""
1060 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001061 self.rolling_hash.update(chunk)
1062 if not is_last:
1063 return
1064
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001065 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001066 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001067 msg = 'Incorrect file size: want %d, got %d' % (
1068 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001069 raise IOError(msg)
1070
1071 actual_digest = self.rolling_hash.hexdigest()
1072 if self.expected_digest != actual_digest:
1073 msg = 'Incorrect digest: want %s, got %s' % (
1074 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001075 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001076
1077
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001078class IsolatedBundle(object):
1079 """Fetched and parsed .isolated file with all dependencies."""
1080
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001081 def __init__(self, filter_cb):
1082 """
1083 filter_cb: callback function to filter downloaded content.
1084 When filter_cb is not None, Isolated file is downloaded iff
1085 filter_cb(filepath) returns True.
1086 """
1087
Vadim Shtayura3148e072014-09-02 18:51:52 -07001088 self.command = []
1089 self.files = {}
1090 self.read_only = None
1091 self.relative_cwd = None
1092 # The main .isolated file, a IsolatedFile instance.
1093 self.root = None
1094
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001095 self._filter_cb = filter_cb
1096
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001097 def fetch(self, fetch_queue, root_isolated_hash, algo):
1098 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001099
1100 It enables support for "included" .isolated files. They are processed in
1101 strict order but fetched asynchronously from the cache. This is important so
1102 that a file in an included .isolated file that is overridden by an embedding
1103 .isolated file is not fetched needlessly. The includes are fetched in one
1104 pass and the files are fetched as soon as all the ones on the left-side
1105 of the tree were fetched.
1106
1107 The prioritization is very important here for nested .isolated files.
1108 'includes' have the highest priority and the algorithm is optimized for both
1109 deep and wide trees. A deep one is a long link of .isolated files referenced
1110 one at a time by one item in 'includes'. A wide one has a large number of
1111 'includes' in a single .isolated file. 'left' is defined as an included
1112 .isolated file earlier in the 'includes' list. So the order of the elements
1113 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001114
1115 As a side effect this method starts asynchronous fetch of all data files
1116 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1117 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001118 """
1119 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1120
1121 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1122 pending = {}
1123 # Set of hashes of already retrieved items to refuse recursive includes.
1124 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001125 # Set of IsolatedFile's whose data files have already being fetched.
1126 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001127
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001128 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001129 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001130 h = isolated_file.obj_hash
1131 if h in seen:
1132 raise isolated_format.IsolatedError(
1133 'IsolatedFile %s is retrieved recursively' % h)
1134 assert h not in pending
1135 seen.add(h)
1136 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001137 # This isolated item is being added dynamically, notify FetchQueue.
1138 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001139 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1140
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001141 # Start fetching root *.isolated file (single file, not the whole bundle).
1142 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001143
1144 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001145 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001146 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001147 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001148 with fetch_queue.cache.getfileobj(item_hash) as f:
1149 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001150
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001151 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001152 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001153 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001154
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001155 # Always fetch *.isolated files in traversal order, waiting if necessary
1156 # until next to-be-processed node loads. "Waiting" is done by yielding
1157 # back to the outer loop, that waits until some *.isolated is loaded.
1158 for node in isolated_format.walk_includes(self.root):
1159 if node not in processed:
1160 # Not visited, and not yet loaded -> wait for it to load.
1161 if not node.is_loaded:
1162 break
1163 # Not visited and loaded -> process it and continue the traversal.
1164 self._start_fetching_files(node, fetch_queue)
1165 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001166
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001167 # All *.isolated files should be processed by now and only them.
1168 all_isolateds = set(isolated_format.walk_includes(self.root))
1169 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001170 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001171
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001172 # Extract 'command' and other bundle properties.
1173 for node in isolated_format.walk_includes(self.root):
1174 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001175 self.relative_cwd = self.relative_cwd or ''
1176
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001177 def _start_fetching_files(self, isolated, fetch_queue):
1178 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001179
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001180 Modifies self.files.
1181 """
maruel10bea7b2016-12-07 05:03:49 -08001182 files = isolated.data.get('files', {})
1183 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001184 for filepath, properties in files.items():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001185 if self._filter_cb and not self._filter_cb(filepath):
1186 continue
1187
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001188 # Root isolated has priority on the files being mapped. In particular,
1189 # overridden files must not be fetched.
1190 if filepath not in self.files:
1191 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001192
1193 # Make sure if the isolated is read only, the mode doesn't have write
1194 # bits.
1195 if 'm' in properties and self.read_only:
1196 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1197
1198 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001199 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001200 fetch_queue.add(
1201 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1202
1203 def _update_self(self, node):
1204 """Extracts bundle global parameters from loaded *.isolated file.
1205
1206 Will be called with each loaded *.isolated file in order of traversal of
1207 isolated include graph (see isolated_format.walk_includes).
1208 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001209 # Grabs properties.
1210 if not self.command and node.data.get('command'):
1211 # Ensure paths are correctly separated on windows.
1212 self.command = node.data['command']
1213 if self.command:
1214 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001215 if self.read_only is None and node.data.get('read_only') is not None:
1216 self.read_only = node.data['read_only']
1217 if (self.relative_cwd is None and
1218 node.data.get('relative_cwd') is not None):
1219 self.relative_cwd = node.data['relative_cwd']
1220
1221
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001222def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001223 """Returns Storage class that can upload and download from |namespace|.
1224
1225 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001226 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001227
1228 Returns:
1229 Instance of Storage.
1230 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001231 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1232 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001233
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001234
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001235def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1236 """Put downloaded file to destination path. This function is used for multi
1237 threaded file putting.
1238 """
Takuto Ikuta523c6472019-09-18 02:53:34 +00001239 with tools.Profiler("_map_file for %s" % dst):
1240 with cache.getfileobj(digest) as srcfileobj:
1241 filetype = props.get('t', 'basic')
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001242
Takuto Ikuta523c6472019-09-18 02:53:34 +00001243 if filetype == 'basic':
1244 # Ignore all bits apart from the user.
1245 file_mode = (props.get('m') or 0o500) & 0o700
1246 if read_only:
1247 # Enforce read-only if the root bundle does.
1248 file_mode &= 0o500
1249 putfile(srcfileobj, dst, file_mode, use_symlink=use_symlinks)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001250
Takuto Ikuta523c6472019-09-18 02:53:34 +00001251 elif filetype == 'tar':
1252 basedir = os.path.dirname(dst)
1253 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1254 ensured_dirs = set()
1255 for ti in t:
1256 if not ti.isfile():
1257 logging.warning('Path(%r) is nonfile (%s), skipped', ti.name,
1258 ti.type)
1259 continue
1260 # Handle files created on Windows fetched on POSIX and the
1261 # reverse.
1262 other_sep = '/' if os.path.sep == '\\' else '\\'
1263 name = ti.name.replace(other_sep, os.path.sep)
1264 fp = os.path.normpath(os.path.join(basedir, name))
1265 if not fp.startswith(basedir):
1266 logging.error('Path(%r) is outside root directory', fp)
1267 ifd = t.extractfile(ti)
1268 fp_dir = os.path.dirname(fp)
1269 if fp_dir not in ensured_dirs:
1270 file_path.ensure_tree(fp_dir)
1271 ensured_dirs.add(fp_dir)
1272 file_mode = ti.mode & 0o700
1273 if read_only:
1274 # Enforce read-only if the root bundle does.
1275 file_mode &= 0o500
1276 putfile(ifd, fp, file_mode, ti.size)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001277
Takuto Ikuta523c6472019-09-18 02:53:34 +00001278 else:
1279 raise isolated_format.IsolatedError('Unknown file type %r' % filetype)
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001280
1281
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001282def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1283 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001284 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001285
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001286 Arguments:
1287 isolated_hash: hash of the root *.isolated file.
1288 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001289 cache: ContentAddressedCache class that knows how to store and map files
1290 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001291 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001292 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001293 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001294
1295 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001296 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001297 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001298 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001299 'fetch_isolated(%s, %s, %s, %s, %s)',
1300 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001301 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001302 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001303 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001304 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001305
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001306 with tools.Profiler('GetIsolateds'):
1307 # Optionally support local files by manually adding them to cache.
1308 if not isolated_format.is_valid_hash(isolated_hash, algo):
1309 logging.debug('%s is not a valid hash, assuming a file '
1310 '(algo was %s, hash size was %d)',
1311 isolated_hash, algo(), algo().digest_size)
1312 path = unicode(os.path.abspath(isolated_hash))
1313 try:
1314 isolated_hash = fetch_queue.inject_local_file(path, algo)
1315 except IOError as e:
1316 raise isolated_format.MappingError(
1317 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1318 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001319
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001320 # Load all *.isolated and start loading rest of the files.
1321 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001322
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001323 with tools.Profiler('GetRest'):
1324 # Create file system hierarchy.
1325 file_path.ensure_tree(outdir)
1326 create_directories(outdir, bundle.files)
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001327 _create_symlinks(outdir, bundle.files.items())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001328
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001329 # Ensure working directory exists.
1330 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1331 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001332
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001333 # Multimap: digest -> list of pairs (path, props).
1334 remaining = {}
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001335 for filepath, props in bundle.files.items():
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001336 if 'h' in props:
1337 remaining.setdefault(props['h'], []).append((filepath, props))
1338 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001339
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001340 # Now block on the remaining files to be downloaded and mapped.
1341 logging.info('Retrieving remaining files (%d of them)...',
1342 fetch_queue.pending_count)
1343 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001344
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001345 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1346 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1347 while remaining:
1348 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001349
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001350 # Wait for any item to finish fetching to cache.
1351 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001352
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001353 # Create the files in the destination using item in cache as the
1354 # source.
1355 for filepath, props in remaining.pop(digest):
1356 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001357
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001358 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1359 _map_file, fullpath, digest,
1360 props, cache, bundle.read_only,
1361 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001362
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001363 # Report progress.
1364 duration = time.time() - last_update
1365 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1366 msg = '%d files remaining...' % len(remaining)
1367 sys.stdout.write(msg + '\n')
1368 sys.stdout.flush()
1369 logging.info(msg)
1370 last_update = time.time()
1371 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1372 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001373
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001374 # Save the cache right away to not loose the state of the new objects.
1375 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001376 # Cache could evict some items we just tried to fetch, it's a fatal error.
1377 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001378 free_disk = file_path.get_free_space(cache.cache_dir)
1379 msg = (
1380 'Cache is too small to hold all requested files.\n'
1381 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001382 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001383 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001384 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001385
1386
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001387def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001388 """Yields every file and/or symlink found.
1389
1390 Yields:
1391 tuple(FileItem, relpath, metadata)
1392 For a symlink, FileItem is None.
1393 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001394 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001395 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001396 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001397 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001398 root,
1399 u'.' + os.path.sep,
1400 blacklist,
1401 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001402
1403 filepath = os.path.join(root, relpath)
1404 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001405 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001406 meta = isolated_format.file_to_metadata(filepath, 0, False)
1407 yield None, relpath, meta
1408 continue
1409
1410 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001411 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1412 # The file was added to the current pending tarball and won't be archived
1413 # individually.
1414 continue
1415
1416 # Flush and reset the bundle.
1417 for i, p, m in bundle.yield_item_path_meta():
1418 yield i, p, m
1419 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001420
1421 # Yield the file individually.
1422 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1423 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001424
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001425 for i, p, m in bundle.yield_item_path_meta():
1426 yield i, p, m
1427
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001428
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001429def _print_upload_stats(items, missing):
1430 """Prints upload stats."""
1431 total = len(items)
1432 total_size = sum(f.size for f in items)
1433 logging.info(
1434 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1435 cache_hit = set(items).difference(missing)
1436 cache_hit_size = sum(f.size for f in cache_hit)
1437 logging.info(
1438 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1439 len(cache_hit),
1440 cache_hit_size / 1024.,
1441 len(cache_hit) * 100. / total,
1442 cache_hit_size * 100. / total_size if total_size else 0)
1443 cache_miss = missing
1444 cache_miss_size = sum(f.size for f in cache_miss)
1445 logging.info(
1446 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1447 len(cache_miss),
1448 cache_miss_size / 1024.,
1449 len(cache_miss) * 100. / total,
1450 cache_miss_size * 100. / total_size if total_size else 0)
1451
1452
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001453def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001454 """Called by archive_files_to_storage for a directory.
1455
1456 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001457
1458 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001459 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001460 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001461 files = {}
1462 for item, relpath, meta in _directory_to_metadata(
1463 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001464 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001465 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001466 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001467 yield item
1468
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001469 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001470 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001471 'algo': hash_algo_name,
1472 'files': files,
1473 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001474 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001475 # Keep the file in memory. This is fine because .isolated files are relatively
1476 # small.
1477 yield BufferItem(
1478 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001479
1480
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001481def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001482 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001483
1484 Arguments:
1485 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001486 files: iterable of files to upload. If a directory is specified (with a
1487 trailing slash), a .isolated file is created and its hash is returned.
1488 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001489 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001490
1491 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001492 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001493 The first file in the first item is always the .isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001494 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001495 # Dict of path to hash.
1496 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001497 hash_algo = storage.server_ref.hash_algo
1498 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001499 # Generator of FileItem to pass to upload_items() concurrent operation.
1500 channel = threading_utils.TaskChannel()
1501 uploaded_digests = set()
1502 def _upload_items():
1503 results = storage.upload_items(channel)
1504 uploaded_digests.update(f.digest for f in results)
1505 t = threading.Thread(target=_upload_items)
1506 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001507
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001508 # Keep track locally of the items to determine cold and hot items.
1509 items_found = []
1510 try:
1511 for f in files:
1512 assert isinstance(f, unicode), repr(f)
1513 if f in results:
1514 # Duplicate
1515 continue
1516 try:
1517 filepath = os.path.abspath(f)
1518 if fs.isdir(filepath):
1519 # Uploading a whole directory.
1520 item = None
1521 for item in _enqueue_dir(
1522 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001523 channel.send_result(item)
1524 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001525 # The very last item will be the .isolated file.
1526 if not item:
1527 # There was no file in the directory.
1528 continue
1529 elif fs.isfile(filepath):
1530 item = FileItem(
1531 path=filepath,
1532 algo=hash_algo,
1533 size=None,
1534 high_priority=f.endswith('.isolated'))
1535 channel.send_result(item)
1536 items_found.append(item)
1537 else:
1538 raise Error('%s is neither a file or directory.' % f)
1539 results[f] = item.digest
1540 except OSError:
1541 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001542 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001543 # Stops the generator, so _upload_items() can exit.
1544 channel.send_done()
1545 t.join()
1546
1547 cold = []
1548 hot = []
1549 for i in items_found:
1550 # Note that multiple FileItem may have the same .digest.
1551 if i.digest in uploaded_digests:
1552 cold.append(i)
1553 else:
1554 hot.append(i)
1555 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001556
1557
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001558@subcommand.usage('<file1..fileN> or - to read from stdin')
1559def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001560 """Archives data to the server.
1561
1562 If a directory is specified, a .isolated file is created the whole directory
1563 is uploaded. Then this .isolated file can be included in another one to run
1564 commands.
1565
1566 The commands output each file that was processed with its content hash. For
1567 directories, the .isolated generated for the directory is listed as the
1568 directory entry itself.
1569 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001570 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001571 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001572 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001573 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001574 server_ref = isolate_storage.ServerRef(
1575 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001576 if files == ['-']:
1577 files = (l.rstrip('\n\r') for l in sys.stdin)
1578 if not files:
1579 parser.error('Nothing to upload')
1580 files = (f.decode('utf-8') for f in files)
1581 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001582 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001583 with get_storage(server_ref) as storage:
1584 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001585 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001586 parser.error(e.args[0])
Marc-Antoine Ruel04903a32019-10-09 21:09:25 +00001587 print('\n'.join('%s %s' % (h, f) for f, h in results.items()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001588 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001589
1590
1591def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001592 """Download data from the server.
1593
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001594 It can either download individual files or a complete tree from a .isolated
1595 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001596 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001597 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001598 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001599 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001600 help='hash of an isolated file, .isolated file content is discarded, use '
1601 '--file if you need it')
1602 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001603 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1604 help='hash and destination of a file, can be used multiple times')
1605 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001606 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001607 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001608 parser.add_option(
1609 '--use-symlinks', action='store_true',
1610 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001611 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001612 options, args = parser.parse_args(args)
1613 if args:
1614 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001615 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001616 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001617
nodir55be77b2016-05-03 09:39:57 -07001618 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001619 if bool(options.isolated) == bool(options.file):
1620 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001621 if not options.cache and options.use_symlinks:
1622 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001623
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001624 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001625 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001626 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001627 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001628 if (fs.isfile(options.target) or
1629 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001630 parser.error(
1631 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001632 server_ref = isolate_storage.ServerRef(
1633 options.isolate_server, options.namespace)
1634 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001635 # Fetching individual files.
1636 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001637 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001638 channel = threading_utils.TaskChannel()
1639 pending = {}
1640 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001641 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001642 pending[digest] = dest
1643 storage.async_fetch(
1644 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001645 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001646 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001647 local_caching.UNKNOWN_FILE_SIZE,
1648 functools.partial(
1649 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001650 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001651 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001652 dest = pending.pop(fetched)
1653 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001654
Vadim Shtayura3172be52013-12-03 12:49:05 -08001655 # Fetching whole isolated tree.
1656 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001657 bundle = fetch_isolated(
1658 isolated_hash=options.isolated,
1659 storage=storage,
1660 cache=cache,
1661 outdir=options.target,
1662 use_symlinks=options.use_symlinks)
1663 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001664 if bundle.command:
1665 rel = os.path.join(options.target, bundle.relative_cwd)
1666 print('To run this test please run from the directory %s:' %
1667 os.path.join(options.target, rel))
1668 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001669
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001670 return 0
1671
1672
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001673def add_archive_options(parser):
1674 parser.add_option(
1675 '--blacklist',
1676 action='append', default=list(DEFAULT_BLACKLIST),
1677 help='List of regexp to use as blacklist filter when uploading '
1678 'directories')
1679
1680
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001681def add_isolate_server_options(parser):
1682 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001683 parser.add_option(
1684 '-I', '--isolate-server',
1685 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001686 help='URL of the Isolate Server to use. Defaults to the environment '
1687 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1688 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001689 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001690 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001691 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001692 '--namespace', default='default-gzip',
1693 help='The namespace to use on the Isolate Server, default: %default')
1694
1695
nodir55be77b2016-05-03 09:39:57 -07001696def process_isolate_server_options(
1697 parser, options, set_exception_handler, required):
1698 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001699
1700 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001701 """
1702 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001703 if required:
1704 parser.error('--isolate-server is required.')
1705 return
1706
aludwind7b7b7e2017-06-29 16:38:50 -07001707 if options.grpc_proxy:
1708 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001709 else:
1710 try:
1711 options.isolate_server = net.fix_url(options.isolate_server)
1712 except ValueError as e:
1713 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001714 if set_exception_handler:
1715 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001716 try:
1717 return auth.ensure_logged_in(options.isolate_server)
1718 except ValueError as e:
1719 parser.error(str(e))
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001720 return None
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001721
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001722
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001723def add_cache_options(parser):
1724 cache_group = optparse.OptionGroup(parser, 'Cache management')
1725 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001726 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001727 help='Directory to keep a local cache of the files. Accelerates download '
1728 'by reusing already downloaded files. Default=%default')
1729 cache_group.add_option(
1730 '--max-cache-size',
1731 type='int',
1732 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001733 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001734 help='Trim if the cache gets larger than this value, default=%default')
1735 cache_group.add_option(
1736 '--min-free-space',
1737 type='int',
1738 metavar='NNN',
1739 default=2*1024*1024*1024,
1740 help='Trim if disk free space becomes lower than this value, '
1741 'default=%default')
1742 cache_group.add_option(
1743 '--max-items',
1744 type='int',
1745 metavar='NNN',
1746 default=100000,
1747 help='Trim if more than this number of items are in the cache '
1748 'default=%default')
1749 parser.add_option_group(cache_group)
1750
1751
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001752def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001753 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001754 policies = local_caching.CachePolicies(
1755 options.max_cache_size,
1756 options.min_free_space,
1757 options.max_items,
1758 # 3 weeks.
1759 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001760
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001761 # |options.cache| path may not exist until DiskContentAddressedCache()
1762 # instance is created.
1763 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel79d42192019-02-06 19:24:16 +00001764 unicode(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruel793bff32019-04-18 17:50:48 +00001765 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001766
1767
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001768class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001769 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001770 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001771 self,
1772 version=__version__,
1773 prog=os.path.basename(sys.modules[__name__].__file__),
1774 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001775 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001776
1777 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001778 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001779 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001780 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001781 return options, args
1782
1783
1784def main(args):
1785 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001786 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001787
1788
1789if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001790 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001791 fix_encoding.fix_encoding()
1792 tools.disable_buffering()
1793 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001794 sys.exit(main(sys.argv[1:]))