blob: ace3a1cd5933c4e1fb9b54c25b7c59105f789f04 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000016import Queue
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
tansell26de79e2016-11-13 18:41:11 -080021import tarfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000022import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000024import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000025
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000026from utils import tools
27tools.force_local_third_party()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000028
Marc-Antoine Ruel016c7602019-04-02 18:31:13 +000029# third_party/
30import colorama
31from depot_tools import fix_encoding
32from depot_tools import subcommand
33
34# pylint: disable=ungrouped-imports
35import auth
36import isolated_format
37import isolate_storage
38import local_caching
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050039from utils import file_path
maruel12e30012015-10-09 11:55:35 -070040from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040041from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000042from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040043from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070044from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000045from utils import threading_utils
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080046
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000047
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000048# Version of isolate protocol passed to the server in /handshake request.
49ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000050
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000051
Vadim Shtayura3148e072014-09-02 18:51:52 -070052# Maximum expected delay (in seconds) between successive file fetches or uploads
53# in Storage. If it takes longer than that, a deadlock might be happening
54# and all stack frames for all threads are dumped to log.
55DEADLOCK_TIMEOUT = 5 * 60
56
57
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# All files are sorted by likelihood of a change in the file content
60# (currently file size is used to estimate this: larger the file -> larger the
61# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000062# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000063# and so on. Numbers here is a trade-off; the more per request, the lower the
64# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
65# larger values cause longer lookups, increasing the initial latency to start
66# uploading, which is especially an issue for large files. This value is
67# optimized for the "few thousands files to look up with minimal number of large
68# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040069ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000070
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000071
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072# A list of already compressed extension types that should not receive any
73# compression before being uploaded.
74ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040075 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
76 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000077]
78
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000079
maruel@chromium.org41601642013-09-18 19:40:46 +000080# The delay (in seconds) to wait between logging statements when retrieving
81# the required files. This is intended to let the user (or buildbot) know that
82# the program is still running.
83DELAY_BETWEEN_UPDATES_IN_SECS = 30
84
85
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050086DEFAULT_BLACKLIST = (
87 # Temporary vim or python files.
88 r'^.+\.(?:pyc|swp)$',
89 # .git or .svn directory.
90 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
91)
92
93
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050094class Error(Exception):
95 """Generic runtime error."""
96 pass
97
98
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040099class Aborted(Error):
100 """Operation aborted."""
101 pass
102
103
nodir90bc8dc2016-06-15 13:35:21 -0700104class AlreadyExists(Error):
105 """File already exists."""
106
107
maruel12e30012015-10-09 11:55:35 -0700108def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800109 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700110 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800111 if offset:
112 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000113 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000114 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000115 if not data:
116 break
117 yield data
118
119
tansell9e04a8d2016-07-28 09:31:59 -0700120def fileobj_path(fileobj):
121 """Return file system path for file like object or None.
122
123 The returned path is guaranteed to exist and can be passed to file system
124 operations like copy.
125 """
126 name = getattr(fileobj, 'name', None)
127 if name is None:
128 return
129
130 # If the file like object was created using something like open("test.txt")
131 # name will end up being a str (such as a function outside our control, like
132 # the standard library). We want all our paths to be unicode objects, so we
133 # decode it.
134 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500135 # We incorrectly assume that UTF-8 is used everywhere.
136 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700137
tansell26de79e2016-11-13 18:41:11 -0800138 # fs.exists requires an absolute path, otherwise it will fail with an
139 # assertion error.
140 if not os.path.isabs(name):
141 return
142
tansell9e04a8d2016-07-28 09:31:59 -0700143 if fs.exists(name):
144 return name
145
146
147# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
148# wrappers have been created.
149def fileobj_copy(
150 dstfileobj, srcfileobj, size=-1,
151 chunk_size=isolated_format.DISK_FILE_CHUNK):
152 """Copy data from srcfileobj to dstfileobj.
153
154 Providing size means exactly that amount of data will be copied (if there
155 isn't enough data, an IOError exception is thrown). Otherwise all data until
156 the EOF marker will be copied.
157 """
158 if size == -1 and hasattr(srcfileobj, 'tell'):
159 if srcfileobj.tell() != 0:
160 raise IOError('partial file but not using size')
161
162 written = 0
163 while written != size:
164 readsize = chunk_size
165 if size > 0:
166 readsize = min(readsize, size-written)
167 data = srcfileobj.read(readsize)
168 if not data:
169 if size == -1:
170 break
171 raise IOError('partial file, got %s, wanted %s' % (written, size))
172 dstfileobj.write(data)
173 written += len(data)
174
175
176def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
177 """Put srcfileobj at the given dstpath with given mode.
178
179 The function aims to do this as efficiently as possible while still allowing
180 any possible file like object be given.
181
182 Creating a tree of hardlinks has a few drawbacks:
183 - tmpfs cannot be used for the scratch space. The tree has to be on the same
184 partition as the cache.
185 - involves a write to the inode, which advances ctime, cause a metadata
186 writeback (causing disk seeking).
187 - cache ctime cannot be used to detect modifications / corruption.
188 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
189 partition. This is why the function automatically fallbacks to copying the
190 file content.
191 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
192 same owner is for all hardlinks.
193 - Anecdotal report that ext2 is known to be potentially faulty on high rate
194 of hardlink creation.
195
196 Creating a tree of symlinks has a few drawbacks:
197 - Tasks running the equivalent of os.path.realpath() will get the naked path
198 and may fail.
199 - Windows:
200 - Symlinks are reparse points:
201 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
202 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
203 - Symbolic links are Win32 paths, not NT paths.
204 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
205 - Symbolic links are supported on Windows 7 and later only.
206 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
207 default.
208 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
209 RID is present in the token;
210 https://msdn.microsoft.com/en-us/library/bb530410.aspx
211 """
212 srcpath = fileobj_path(srcfileobj)
213 if srcpath and size == -1:
214 readonly = file_mode is None or (
215 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
216
217 if readonly:
218 # If the file is read only we can link the file
219 if use_symlink:
220 link_mode = file_path.SYMLINK_WITH_FALLBACK
221 else:
222 link_mode = file_path.HARDLINK_WITH_FALLBACK
223 else:
224 # If not read only, we must copy the file
225 link_mode = file_path.COPY
226
227 file_path.link_file(dstpath, srcpath, link_mode)
228 else:
229 # Need to write out the file
230 with fs.open(dstpath, 'wb') as dstfileobj:
231 fileobj_copy(dstfileobj, srcfileobj, size)
232
233 assert fs.exists(dstpath)
234
235 # file_mode of 0 is actually valid, so need explicit check.
236 if file_mode is not None:
237 fs.chmod(dstpath, file_mode)
238
239
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000240def zip_compress(content_generator, level=7):
241 """Reads chunks from |content_generator| and yields zip compressed chunks."""
242 compressor = zlib.compressobj(level)
243 for chunk in content_generator:
244 compressed = compressor.compress(chunk)
245 if compressed:
246 yield compressed
247 tail = compressor.flush(zlib.Z_FINISH)
248 if tail:
249 yield tail
250
251
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400252def zip_decompress(
253 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000254 """Reads zipped data from |content_generator| and yields decompressed data.
255
256 Decompresses data in small chunks (no larger than |chunk_size|) so that
257 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
258
259 Raises IOError if data is corrupted or incomplete.
260 """
261 decompressor = zlib.decompressobj()
262 compressed_size = 0
263 try:
264 for chunk in content_generator:
265 compressed_size += len(chunk)
266 data = decompressor.decompress(chunk, chunk_size)
267 if data:
268 yield data
269 while decompressor.unconsumed_tail:
270 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
271 if data:
272 yield data
273 tail = decompressor.flush()
274 if tail:
275 yield tail
276 except zlib.error as e:
277 raise IOError(
278 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
279 # Ensure all data was read and decompressed.
280 if decompressor.unused_data or decompressor.unconsumed_tail:
281 raise IOError('Not all data was decompressed')
282
283
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000284def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000285 """Given a filename calculates the ideal zip compression level to use."""
286 file_ext = os.path.splitext(filename)[1].lower()
287 # TODO(csharp): Profile to find what compression level works best.
288 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
289
290
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000291def create_directories(base_directory, files):
292 """Creates the directory structure needed by the given list of files."""
293 logging.debug('create_directories(%s, %d)', base_directory, len(files))
294 # Creates the tree of directories to create.
295 directories = set(os.path.dirname(f) for f in files)
296 for item in list(directories):
297 while item:
298 directories.add(item)
299 item = os.path.dirname(item)
300 for d in sorted(directories):
301 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700302 abs_d = os.path.join(base_directory, d)
303 if not fs.isdir(abs_d):
304 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000305
306
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000307def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500308 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000309 for filepath, properties in files:
310 if 'l' not in properties:
311 continue
312 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500313 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000314 logging.warning('Ignoring symlink %s', filepath)
315 continue
316 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700317 try:
318 os.symlink(properties['l'], outfile) # pylint: disable=E1101
319 except OSError as e:
320 if e.errno == errno.EEXIST:
321 raise AlreadyExists('File %s already exists.' % outfile)
322 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000323
324
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000325class _ThreadFile(object):
326 """Multithreaded fake file. Used by TarBundle."""
327 def __init__(self):
328 self._data = threading_utils.TaskChannel()
329 self._offset = 0
330
331 def __iter__(self):
332 return self._data
333
334 def tell(self):
335 return self._offset
336
337 def write(self, b):
338 self._data.send_result(b)
339 self._offset += len(b)
340
341 def close(self):
342 self._data.send_done()
343
344
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400345class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800346 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000347
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800348 Its digest and size may be provided in advance, if known. Otherwise they will
349 be derived from the file content.
350 """
351
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000352 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800353 super(FileItem, self).__init__(
354 digest,
maruel12e30012015-10-09 11:55:35 -0700355 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000356 high_priority,
357 compression_level=_get_zip_compression_level(path))
358 self._path = path
359 self._algo = algo
360 self._meta = None
361
362 @property
363 def path(self):
364 return self._path
365
366 @property
367 def digest(self):
368 if not self._digest:
369 self._digest = isolated_format.hash_file(self._path, self._algo)
370 return self._digest
371
372 @property
373 def meta(self):
374 if not self._meta:
375 # TODO(maruel): Inline.
376 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
377 # We need to hash right away.
378 self._meta['h'] = self.digest
379 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000380
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800381 def content(self):
382 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000383
384
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000385class TarBundle(isolate_storage.Item):
386 """Tarfile to push to Storage.
387
388 Its digest is the digest of all the files it contains. It is generated on the
389 fly.
390 """
391
392 def __init__(self, root, algo):
393 # 2 trailing 512 bytes headers.
394 super(TarBundle, self).__init__(size=1024)
395 self._items = []
396 self._meta = None
397 self._algo = algo
398 self._root_len = len(root) + 1
399 # Same value as for Go.
400 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
401 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
402 self._archive_max_size = int(10e6)
403
404 @property
405 def digest(self):
406 if not self._digest:
407 self._prepare()
408 return self._digest
409
410 @property
411 def size(self):
412 if self._size is None:
413 self._prepare()
414 return self._size
415
416 def try_add(self, item):
417 """Try to add this file to the bundle.
418
419 It is extremely naive but this should be just enough for
420 https://crbug.com/825418.
421
422 Future improvements should be in the Go code, and the Swarming bot should be
423 migrated to use the Go code instead.
424 """
425 if not item.size:
426 return False
427 # pylint: disable=unreachable
428 rounded = (item.size + 512) & ~511
429 if rounded + self._size > self._archive_max_size:
430 return False
431 # https://crbug.com/825418
432 return False
433 self._size += rounded
434 self._items.append(item)
435 return True
436
437 def yield_item_path_meta(self):
438 """Returns a tuple(Item, filepath, meta_dict).
439
440 If the bundle contains less than 5 items, the items are yielded.
441 """
442 if len(self._items) < 5:
443 # The tarball is too small, yield individual items, if any.
444 for item in self._items:
445 yield item, item.path[self._root_len:], item.meta
446 else:
447 # This ensures self._meta is set.
448 p = self.digest + '.tar'
449 # Yield itself as a tarball.
450 yield self, p, self._meta
451
452 def content(self):
453 """Generates the tarfile content on the fly."""
454 obj = _ThreadFile()
455 def _tar_thread():
456 try:
457 t = tarfile.open(
458 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
459 for item in self._items:
460 logging.info(' tarring %s', item.path)
461 t.add(item.path)
462 t.close()
463 except Exception:
464 logging.exception('Internal failure')
465 finally:
466 obj.close()
467
468 t = threading.Thread(target=_tar_thread)
469 t.start()
470 try:
471 for data in obj:
472 yield data
473 finally:
474 t.join()
475
476 def _prepare(self):
477 h = self._algo()
478 total = 0
479 for chunk in self.content():
480 h.update(chunk)
481 total += len(chunk)
482 # pylint: disable=attribute-defined-outside-init
483 # This is not true, they are defined in Item.__init__().
484 self._digest = h.hexdigest()
485 self._size = total
486 self._meta = {
487 'h': self.digest,
488 's': self.size,
489 't': u'tar',
490 }
491
492
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400493class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000494 """A byte buffer to push to Storage."""
495
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000496 def __init__(self, buf, algo, high_priority=False):
497 super(BufferItem, self).__init__(
498 digest=algo(buf).hexdigest(),
499 size=len(buf),
500 high_priority=high_priority)
501 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000502
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800503 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000504 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000505
506
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000507class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800508 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000509
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800510 Implements compression support, parallel 'contains' checks, parallel uploads
511 and more.
512
513 Works only within single namespace (and thus hashing algorithm and compression
514 scheme are fixed).
515
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400516 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
517 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800518 """
519
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700520 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000521 self._storage_api = storage_api
522 self._cpu_thread_pool = None
523 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400524 self._aborted = False
525 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000526
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000527 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000528 def server_ref(self):
529 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700530
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000531 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700532 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000533 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700534
535 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000536 def cpu_thread_pool(self):
537 """ThreadPool for CPU-bound tasks like zipping."""
538 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500539 threads = max(threading_utils.num_processors(), 2)
540 if sys.maxsize <= 2L**32:
541 # On 32 bits userland, do not try to use more than 16 threads.
542 threads = min(threads, 16)
543 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000545
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000546 @property
547 def net_thread_pool(self):
548 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
549 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700550 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000551 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000552
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000553 def close(self):
554 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400555 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000556 if self._cpu_thread_pool:
557 self._cpu_thread_pool.join()
558 self._cpu_thread_pool.close()
559 self._cpu_thread_pool = None
560 if self._net_thread_pool:
561 self._net_thread_pool.join()
562 self._net_thread_pool.close()
563 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400564 logging.info('Done.')
565
566 def abort(self):
567 """Cancels any pending or future operations."""
568 # This is not strictly theadsafe, but in the worst case the logging message
569 # will be printed twice. Not a big deal. In other places it is assumed that
570 # unprotected reads and writes to _aborted are serializable (it is true
571 # for python) and thus no locking is used.
572 if not self._aborted:
573 logging.warning('Aborting... It can take a while.')
574 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000575
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000576 def __enter__(self):
577 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400578 assert not self._prev_sig_handlers, self._prev_sig_handlers
579 for s in (signal.SIGINT, signal.SIGTERM):
580 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000581 return self
582
583 def __exit__(self, _exc_type, _exc_value, _traceback):
584 """Context manager interface."""
585 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400586 while self._prev_sig_handlers:
587 s, h = self._prev_sig_handlers.popitem()
588 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000589 return False
590
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000591 def upload_items(self, items):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000592 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000593
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800594 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000595
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000596 It uses 3 threads internally:
597 - One to create batches based on a timeout
598 - One to dispatch the /contains RPC and field the missing entries
599 - One to field the /push RPC
600
601 The main threads enumerates 'items' and pushes to the first thread. Then it
602 join() all the threads, waiting for them to complete.
603
604 (enumerate items of Item, this can be slow as disk is traversed)
605 |
606 v
607 _create_items_batches_thread Thread #1
608 (generates list(Item), every 3s or 20~100 items)
609 |
610 v
611 _do_lookups_thread Thread #2
612 | |
613 v v
614 (missing) (was on server)
615 |
616 v
617 _handle_missing_thread Thread #3
618 |
619 v
620 (upload Item, append to uploaded)
621
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000622 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400623 items: list of isolate_storage.Item instances that represents data to
624 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000625
626 Returns:
627 List of items that were uploaded. All other items are already there.
628 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000629 incoming = Queue.Queue()
630 batches_to_lookup = Queue.Queue()
631 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000632 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800633
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000634 def _create_items_batches_thread():
635 """Creates batches for /contains RPC lookup from individual items.
636
637 Input: incoming
638 Output: batches_to_lookup
639 """
640 try:
641 batch_size_index = 0
642 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
643 batch = []
644 while not self._aborted:
645 try:
646 item = incoming.get(True, timeout=3)
647 if item:
648 batch.append(item)
649 except Queue.Empty:
650 item = False
651 if len(batch) == batch_size or (not item and batch):
652 if len(batch) == batch_size:
653 batch_size_index += 1
654 batch_size = ITEMS_PER_CONTAINS_QUERIES[
655 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
656 batches_to_lookup.put(batch)
657 batch = []
658 if item is None:
659 break
660 finally:
661 # Unblock the next pipeline.
662 batches_to_lookup.put(None)
663
664 def _do_lookups_thread():
665 """Enqueues all the /contains RPCs and emits the missing items.
666
667 Input: batches_to_lookup
668 Output: missing, to_upload
669 """
670 try:
671 channel = threading_utils.TaskChannel()
672 def _contains(b):
673 if self._aborted:
674 raise Aborted()
675 return self._storage_api.contains(b)
676
677 pending_contains = 0
678 while not self._aborted:
679 batch = batches_to_lookup.get()
680 if batch is None:
681 break
682 self.net_thread_pool.add_task_with_channel(
683 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
684 pending_contains += 1
685 while pending_contains and not self._aborted:
686 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000687 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000688 except threading_utils.TaskChannel.Timeout:
689 break
690 pending_contains -= 1
691 for missing_item, push_state in v.iteritems():
692 missing.put((missing_item, push_state))
693 while pending_contains and not self._aborted:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000694 for missing_item, push_state in channel.next().iteritems():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000695 missing.put((missing_item, push_state))
696 pending_contains -= 1
697 finally:
698 # Unblock the next pipeline.
699 missing.put((None, None))
700
701 def _handle_missing_thread():
702 """Sends the missing items to the uploader.
703
704 Input: missing
705 Output: uploaded
706 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700707 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000708 channel = threading_utils.TaskChannel()
709 pending_upload = 0
710 while not self._aborted:
711 try:
712 missing_item, push_state = missing.get(True, timeout=5)
713 if missing_item is None:
714 break
715 self._async_push(channel, missing_item, push_state)
716 pending_upload += 1
717 except Queue.Empty:
718 pass
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000719 detector.ping()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000720 while not self._aborted and pending_upload:
721 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000722 item = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000723 except threading_utils.TaskChannel.Timeout:
724 break
725 uploaded.append(item)
726 pending_upload -= 1
727 logging.debug(
728 'Uploaded %d; %d pending: %s (%d)',
729 len(uploaded), pending_upload, item.digest, item.size)
730 while not self._aborted and pending_upload:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000731 item = channel.next()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000732 uploaded.append(item)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000733 pending_upload -= 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000734 logging.debug(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000735 'Uploaded %d; %d pending: %s (%d)',
736 len(uploaded), pending_upload, item.digest, item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000737
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000738 threads = [
739 threading.Thread(target=_create_items_batches_thread),
740 threading.Thread(target=_do_lookups_thread),
741 threading.Thread(target=_handle_missing_thread),
742 ]
743 for t in threads:
744 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000745
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000746 try:
747 # For each digest keep only first isolate_storage.Item that matches it.
748 # All other items are just indistinguishable copies from the point of view
749 # of isolate server (it doesn't care about paths at all, only content and
750 # digests).
751 seen = {}
752 try:
753 # TODO(maruel): Reorder the items as a priority queue, with larger items
754 # being processed first. This is, before hashing the data.
755 # This must be done in the primary thread since items can be a
756 # generator.
757 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000758 if seen.setdefault(item.digest, item) is item:
759 incoming.put(item)
760 finally:
761 incoming.put(None)
762 finally:
763 for t in threads:
764 t.join()
765
766 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000767 if seen:
768 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000769 return uploaded
770
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000771 def _async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000772 """Starts asynchronous push to the server in a parallel thread.
773
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000774 Can be used only after |item| was checked for presence on a server with a
775 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800776
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000777 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000778 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400779 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000780 push_state: push state returned by storage_api.contains(). It contains
781 storage specific information describing how to upload the item (for
782 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800783
784 Returns:
785 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000786 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800787 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400788 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700789 threading_utils.PRIORITY_HIGH if item.high_priority
790 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800791
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000792 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400793 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400794 if self._aborted:
795 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800796 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000797 return item
798
Wei Huang1a38fbe2017-11-28 22:55:22 -0500799 # If zipping is not required, just start a push task. Don't pass 'content'
800 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000801 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000802 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000803 return
804
805 # If zipping is enabled, zip in a separate thread.
806 def zip_and_push():
807 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
808 # content right here. It will block until all file is zipped.
809 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400810 if self._aborted:
811 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800812 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000813 data = ''.join(stream)
814 except Exception as exc:
815 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800816 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000817 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500818 # Pass '[data]' explicitly because the compressed data is not same as the
819 # one provided by 'item'. Since '[data]' is a list, it can safely be
820 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000821 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000822 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000823 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000824
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800825 def push(self, item, push_state):
826 """Synchronously pushes a single item to the server.
827
828 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000829 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800830
831 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400832 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000833 push_state: push state returned by storage_api.contains(). It contains
834 storage specific information describing how to upload the item (for
835 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800836
837 Returns:
838 Pushed item (same object as |item|).
839 """
840 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700841 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000842 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000843 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800844 assert pushed is item
845 return item
846
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000847 def async_fetch(self, channel, priority, digest, size, sink):
848 """Starts asynchronous fetch from the server in a parallel thread.
849
850 Arguments:
851 channel: TaskChannel that receives back |digest| when download ends.
852 priority: thread pool task priority for the fetch.
853 digest: hex digest of an item to download.
854 size: expected size of the item (after decompression).
855 sink: function that will be called as sink(generator).
856 """
857 def fetch():
858 try:
859 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400860 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000861 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400862 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000863 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000864 verifier = FetchStreamVerifier(
865 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000866 # Verified stream goes to |sink|.
867 sink(verifier.run())
868 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800869 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000870 raise
871 return digest
872
873 # Don't bother with zip_thread_pool for decompression. Decompression is
874 # really fast and most probably IO bound anyway.
875 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
876
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000877
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000878class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400879 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000880
881 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400882 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
883 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000884 """
885
886 def __init__(self, storage, cache):
887 self.storage = storage
888 self.cache = cache
889 self._channel = threading_utils.TaskChannel()
890 self._pending = set()
891 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000892 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400893 # Pending digests that the caller waits for, see wait_on()/wait().
894 self._waiting_on = set()
895 # Already fetched digests the caller waits for which are not yet returned by
896 # wait().
897 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000898
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400899 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700900 self,
901 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400902 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700903 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000904 """Starts asynchronous fetch of item |digest|."""
905 # Fetching it now?
906 if digest in self._pending:
907 return
908
909 # Mark this file as in use, verify_all_cached will later ensure it is still
910 # in cache.
911 self._accessed.add(digest)
912
913 # Already fetched? Notify cache to update item's LRU position.
914 if digest in self._fetched:
915 # 'touch' returns True if item is in cache and not corrupted.
916 if self.cache.touch(digest, size):
917 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000918 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000919 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000920
921 # TODO(maruel): It should look at the free disk space, the current cache
922 # size and the size of the new item on every new item:
923 # - Trim the cache as more entries are listed when free disk space is low,
924 # otherwise if the amount of data downloaded during the run > free disk
925 # space, it'll crash.
926 # - Make sure there's enough free disk space to fit all dependencies of
927 # this run! If not, abort early.
928
929 # Start fetching.
930 self._pending.add(digest)
931 self.storage.async_fetch(
932 self._channel, priority, digest, size,
933 functools.partial(self.cache.write, digest))
934
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400935 def wait_on(self, digest):
936 """Updates digests to be waited on by 'wait'."""
937 # Calculate once the already fetched items. These will be retrieved first.
938 if digest in self._fetched:
939 self._waiting_on_ready.add(digest)
940 else:
941 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000942
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400943 def wait(self):
944 """Waits until any of waited-on items is retrieved.
945
946 Once this happens, it is remove from the waited-on set and returned.
947
948 This function is called in two waves. The first wave it is done for HIGH
949 priority items, the isolated files themselves. The second wave it is called
950 for all the files.
951
952 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000953 """
954 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400955 if self._waiting_on_ready:
956 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000957
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400958 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000959
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400960 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000961 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000962 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000963 self._pending.remove(digest)
964 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400965 if digest in self._waiting_on:
966 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000967 return digest
968
969 # Should never reach this point due to assert above.
970 raise RuntimeError('Impossible state')
971
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400972 @property
973 def wait_queue_empty(self):
974 """Returns True if there is no digest left for wait() to return."""
975 return not self._waiting_on and not self._waiting_on_ready
976
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000977 def inject_local_file(self, path, algo):
978 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700979 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000980 data = f.read()
981 digest = algo(data).hexdigest()
982 self.cache.write(digest, [data])
983 self._fetched.add(digest)
984 return digest
985
986 @property
987 def pending_count(self):
988 """Returns number of items to be fetched."""
989 return len(self._pending)
990
991 def verify_all_cached(self):
992 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000993 # Not thread safe, but called after all work is done.
994 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000995
996
997class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400998 """Verifies that fetched file is valid before passing it to the
999 ContentAddressedCache.
1000 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001001
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001002 def __init__(self, stream, hasher, expected_digest, expected_size):
1003 """Initializes the verifier.
1004
1005 Arguments:
1006 * stream: an iterable yielding chunks of content
1007 * hasher: an object from hashlib that supports update() and hexdigest()
1008 (eg, hashlib.sha1).
1009 * expected_digest: if the entire stream is piped through hasher and then
1010 summarized via hexdigest(), this should be the result. That is, it
1011 should be a hex string like 'abc123'.
1012 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001013 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001014 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001015 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001016 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001017 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001018 self.expected_size = expected_size
1019 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001020 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001021
1022 def run(self):
1023 """Generator that yields same items as |stream|.
1024
1025 Verifies |stream| is complete before yielding a last chunk to consumer.
1026
1027 Also wraps IOError produced by consumer into MappingError exceptions since
1028 otherwise Storage will retry fetch on unrelated local cache errors.
1029 """
1030 # Read one chunk ahead, keep it in |stored|.
1031 # That way a complete stream can be verified before pushing last chunk
1032 # to consumer.
1033 stored = None
1034 for chunk in self.stream:
1035 assert chunk is not None
1036 if stored is not None:
1037 self._inspect_chunk(stored, is_last=False)
1038 try:
1039 yield stored
1040 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001041 raise isolated_format.MappingError(
1042 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001043 stored = chunk
1044 if stored is not None:
1045 self._inspect_chunk(stored, is_last=True)
1046 try:
1047 yield stored
1048 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001049 raise isolated_format.MappingError(
1050 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001051
1052 def _inspect_chunk(self, chunk, is_last):
1053 """Called for each fetched chunk before passing it to consumer."""
1054 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001055 self.rolling_hash.update(chunk)
1056 if not is_last:
1057 return
1058
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001059 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001060 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001061 msg = 'Incorrect file size: want %d, got %d' % (
1062 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001063 raise IOError(msg)
1064
1065 actual_digest = self.rolling_hash.hexdigest()
1066 if self.expected_digest != actual_digest:
1067 msg = 'Incorrect digest: want %s, got %s' % (
1068 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001069 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001070
1071
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001072class IsolatedBundle(object):
1073 """Fetched and parsed .isolated file with all dependencies."""
1074
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001075 def __init__(self, filter_cb):
1076 """
1077 filter_cb: callback function to filter downloaded content.
1078 When filter_cb is not None, Isolated file is downloaded iff
1079 filter_cb(filepath) returns True.
1080 """
1081
Vadim Shtayura3148e072014-09-02 18:51:52 -07001082 self.command = []
1083 self.files = {}
1084 self.read_only = None
1085 self.relative_cwd = None
1086 # The main .isolated file, a IsolatedFile instance.
1087 self.root = None
1088
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001089 self._filter_cb = filter_cb
1090
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001091 def fetch(self, fetch_queue, root_isolated_hash, algo):
1092 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001093
1094 It enables support for "included" .isolated files. They are processed in
1095 strict order but fetched asynchronously from the cache. This is important so
1096 that a file in an included .isolated file that is overridden by an embedding
1097 .isolated file is not fetched needlessly. The includes are fetched in one
1098 pass and the files are fetched as soon as all the ones on the left-side
1099 of the tree were fetched.
1100
1101 The prioritization is very important here for nested .isolated files.
1102 'includes' have the highest priority and the algorithm is optimized for both
1103 deep and wide trees. A deep one is a long link of .isolated files referenced
1104 one at a time by one item in 'includes'. A wide one has a large number of
1105 'includes' in a single .isolated file. 'left' is defined as an included
1106 .isolated file earlier in the 'includes' list. So the order of the elements
1107 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001108
1109 As a side effect this method starts asynchronous fetch of all data files
1110 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1111 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001112 """
1113 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1114
1115 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1116 pending = {}
1117 # Set of hashes of already retrieved items to refuse recursive includes.
1118 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001119 # Set of IsolatedFile's whose data files have already being fetched.
1120 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001121
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001122 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001123 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001124 h = isolated_file.obj_hash
1125 if h in seen:
1126 raise isolated_format.IsolatedError(
1127 'IsolatedFile %s is retrieved recursively' % h)
1128 assert h not in pending
1129 seen.add(h)
1130 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001131 # This isolated item is being added dynamically, notify FetchQueue.
1132 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001133 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1134
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001135 # Start fetching root *.isolated file (single file, not the whole bundle).
1136 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001137
1138 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001139 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001140 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001141 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001142 with fetch_queue.cache.getfileobj(item_hash) as f:
1143 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001144
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001145 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001146 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001147 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001148
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001149 # Always fetch *.isolated files in traversal order, waiting if necessary
1150 # until next to-be-processed node loads. "Waiting" is done by yielding
1151 # back to the outer loop, that waits until some *.isolated is loaded.
1152 for node in isolated_format.walk_includes(self.root):
1153 if node not in processed:
1154 # Not visited, and not yet loaded -> wait for it to load.
1155 if not node.is_loaded:
1156 break
1157 # Not visited and loaded -> process it and continue the traversal.
1158 self._start_fetching_files(node, fetch_queue)
1159 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001160
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001161 # All *.isolated files should be processed by now and only them.
1162 all_isolateds = set(isolated_format.walk_includes(self.root))
1163 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001164 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001165
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001166 # Extract 'command' and other bundle properties.
1167 for node in isolated_format.walk_includes(self.root):
1168 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001169 self.relative_cwd = self.relative_cwd or ''
1170
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001171 def _start_fetching_files(self, isolated, fetch_queue):
1172 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001173
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001174 Modifies self.files.
1175 """
maruel10bea7b2016-12-07 05:03:49 -08001176 files = isolated.data.get('files', {})
1177 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1178 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001179 if self._filter_cb and not self._filter_cb(filepath):
1180 continue
1181
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001182 # Root isolated has priority on the files being mapped. In particular,
1183 # overridden files must not be fetched.
1184 if filepath not in self.files:
1185 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001186
1187 # Make sure if the isolated is read only, the mode doesn't have write
1188 # bits.
1189 if 'm' in properties and self.read_only:
1190 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1191
1192 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001193 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001194 fetch_queue.add(
1195 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1196
1197 def _update_self(self, node):
1198 """Extracts bundle global parameters from loaded *.isolated file.
1199
1200 Will be called with each loaded *.isolated file in order of traversal of
1201 isolated include graph (see isolated_format.walk_includes).
1202 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001203 # Grabs properties.
1204 if not self.command and node.data.get('command'):
1205 # Ensure paths are correctly separated on windows.
1206 self.command = node.data['command']
1207 if self.command:
1208 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001209 if self.read_only is None and node.data.get('read_only') is not None:
1210 self.read_only = node.data['read_only']
1211 if (self.relative_cwd is None and
1212 node.data.get('relative_cwd') is not None):
1213 self.relative_cwd = node.data['relative_cwd']
1214
1215
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001216def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001217 """Returns Storage class that can upload and download from |namespace|.
1218
1219 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001220 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001221
1222 Returns:
1223 Instance of Storage.
1224 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001225 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1226 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001227
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001228
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001229def _map_file(dst, digest, props, cache, read_only, use_symlinks):
1230 """Put downloaded file to destination path. This function is used for multi
1231 threaded file putting.
1232 """
1233 with cache.getfileobj(digest) as srcfileobj:
1234 filetype = props.get('t', 'basic')
1235
1236 if filetype == 'basic':
1237 # Ignore all bits apart from the user.
1238 file_mode = (props.get('m') or 0500) & 0700
1239 if read_only:
1240 # Enforce read-only if the root bundle does.
1241 file_mode &= 0500
1242 putfile(srcfileobj, dst, file_mode,
1243 use_symlink=use_symlinks)
1244
1245 elif filetype == 'tar':
1246 basedir = os.path.dirname(dst)
1247 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1248 for ti in t:
1249 if not ti.isfile():
1250 logging.warning(
1251 'Path(%r) is nonfile (%s), skipped',
1252 ti.name, ti.type)
1253 continue
1254 # Handle files created on Windows fetched on POSIX and the
1255 # reverse.
1256 other_sep = '/' if os.path.sep == '\\' else '\\'
1257 name = ti.name.replace(other_sep, os.path.sep)
1258 fp = os.path.normpath(os.path.join(basedir, name))
1259 if not fp.startswith(basedir):
1260 logging.error(
1261 'Path(%r) is outside root directory',
1262 fp)
1263 ifd = t.extractfile(ti)
1264 file_path.ensure_tree(os.path.dirname(fp))
1265 file_mode = ti.mode & 0700
1266 if read_only:
1267 # Enforce read-only if the root bundle does.
1268 file_mode &= 0500
1269 putfile(ifd, fp, file_mode, ti.size)
1270
1271 else:
1272 raise isolated_format.IsolatedError(
1273 'Unknown file type %r' % filetype)
1274
1275
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001276def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1277 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001278 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001279
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001280 Arguments:
1281 isolated_hash: hash of the root *.isolated file.
1282 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001283 cache: ContentAddressedCache class that knows how to store and map files
1284 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001285 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001286 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001287 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001288
1289 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001290 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001291 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001292 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001293 'fetch_isolated(%s, %s, %s, %s, %s)',
1294 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001295 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001296 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001297 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001298 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001299
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001300 with tools.Profiler('GetIsolateds'):
1301 # Optionally support local files by manually adding them to cache.
1302 if not isolated_format.is_valid_hash(isolated_hash, algo):
1303 logging.debug('%s is not a valid hash, assuming a file '
1304 '(algo was %s, hash size was %d)',
1305 isolated_hash, algo(), algo().digest_size)
1306 path = unicode(os.path.abspath(isolated_hash))
1307 try:
1308 isolated_hash = fetch_queue.inject_local_file(path, algo)
1309 except IOError as e:
1310 raise isolated_format.MappingError(
1311 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1312 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001313
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001314 # Load all *.isolated and start loading rest of the files.
1315 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001316
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001317 with tools.Profiler('GetRest'):
1318 # Create file system hierarchy.
1319 file_path.ensure_tree(outdir)
1320 create_directories(outdir, bundle.files)
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +00001321 _create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001322
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001323 # Ensure working directory exists.
1324 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1325 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001326
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001327 # Multimap: digest -> list of pairs (path, props).
1328 remaining = {}
1329 for filepath, props in bundle.files.iteritems():
1330 if 'h' in props:
1331 remaining.setdefault(props['h'], []).append((filepath, props))
1332 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001333
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001334 # Now block on the remaining files to be downloaded and mapped.
1335 logging.info('Retrieving remaining files (%d of them)...',
1336 fetch_queue.pending_count)
1337 last_update = time.time()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001338
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001339 with threading_utils.ThreadPool(2, 32, 32) as putfile_thread_pool:
1340 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1341 while remaining:
1342 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001343
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001344 # Wait for any item to finish fetching to cache.
1345 digest = fetch_queue.wait()
tansell9e04a8d2016-07-28 09:31:59 -07001346
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001347 # Create the files in the destination using item in cache as the
1348 # source.
1349 for filepath, props in remaining.pop(digest):
1350 fullpath = os.path.join(outdir, filepath)
tanselle4288c32016-07-28 09:45:40 -07001351
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001352 putfile_thread_pool.add_task(threading_utils.PRIORITY_HIGH,
1353 _map_file, fullpath, digest,
1354 props, cache, bundle.read_only,
1355 use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001356
Takuto Ikutadeba39d2019-04-04 12:18:39 +00001357 # Report progress.
1358 duration = time.time() - last_update
1359 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1360 msg = '%d files remaining...' % len(remaining)
1361 sys.stdout.write(msg + '\n')
1362 sys.stdout.flush()
1363 logging.info(msg)
1364 last_update = time.time()
1365 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
1366 putfile_thread_pool.join()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001367
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001368 # Save the cache right away to not loose the state of the new objects.
1369 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001370 # Cache could evict some items we just tried to fetch, it's a fatal error.
1371 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001372 free_disk = file_path.get_free_space(cache.cache_dir)
1373 msg = (
1374 'Cache is too small to hold all requested files.\n'
1375 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001376 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001377 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001378 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001379
1380
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001381def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001382 """Yields every file and/or symlink found.
1383
1384 Yields:
1385 tuple(FileItem, relpath, metadata)
1386 For a symlink, FileItem is None.
1387 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001388 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001389 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001390 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001391 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001392 root,
1393 u'.' + os.path.sep,
1394 blacklist,
1395 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001396
1397 filepath = os.path.join(root, relpath)
1398 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001399 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001400 meta = isolated_format.file_to_metadata(filepath, 0, False)
1401 yield None, relpath, meta
1402 continue
1403
1404 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001405 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1406 # The file was added to the current pending tarball and won't be archived
1407 # individually.
1408 continue
1409
1410 # Flush and reset the bundle.
1411 for i, p, m in bundle.yield_item_path_meta():
1412 yield i, p, m
1413 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001414
1415 # Yield the file individually.
1416 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1417 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001418
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001419 for i, p, m in bundle.yield_item_path_meta():
1420 yield i, p, m
1421
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001422
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001423def _print_upload_stats(items, missing):
1424 """Prints upload stats."""
1425 total = len(items)
1426 total_size = sum(f.size for f in items)
1427 logging.info(
1428 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1429 cache_hit = set(items).difference(missing)
1430 cache_hit_size = sum(f.size for f in cache_hit)
1431 logging.info(
1432 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1433 len(cache_hit),
1434 cache_hit_size / 1024.,
1435 len(cache_hit) * 100. / total,
1436 cache_hit_size * 100. / total_size if total_size else 0)
1437 cache_miss = missing
1438 cache_miss_size = sum(f.size for f in cache_miss)
1439 logging.info(
1440 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1441 len(cache_miss),
1442 cache_miss_size / 1024.,
1443 len(cache_miss) * 100. / total,
1444 cache_miss_size * 100. / total_size if total_size else 0)
1445
1446
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001447def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001448 """Called by archive_files_to_storage for a directory.
1449
1450 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001451
1452 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001453 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001454 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001455 files = {}
1456 for item, relpath, meta in _directory_to_metadata(
1457 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001458 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001459 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001460 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001461 yield item
1462
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001463 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001464 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001465 'algo': hash_algo_name,
1466 'files': files,
1467 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001468 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001469 # Keep the file in memory. This is fine because .isolated files are relatively
1470 # small.
1471 yield BufferItem(
1472 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001473
1474
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001475def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001476 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001477
1478 Arguments:
1479 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001480 files: iterable of files to upload. If a directory is specified (with a
1481 trailing slash), a .isolated file is created and its hash is returned.
1482 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001483 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001484
1485 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001486 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001487 The first file in the first item is always the .isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001488 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001489 # Dict of path to hash.
1490 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001491 hash_algo = storage.server_ref.hash_algo
1492 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001493 # Generator of FileItem to pass to upload_items() concurrent operation.
1494 channel = threading_utils.TaskChannel()
1495 uploaded_digests = set()
1496 def _upload_items():
1497 results = storage.upload_items(channel)
1498 uploaded_digests.update(f.digest for f in results)
1499 t = threading.Thread(target=_upload_items)
1500 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001501
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001502 # Keep track locally of the items to determine cold and hot items.
1503 items_found = []
1504 try:
1505 for f in files:
1506 assert isinstance(f, unicode), repr(f)
1507 if f in results:
1508 # Duplicate
1509 continue
1510 try:
1511 filepath = os.path.abspath(f)
1512 if fs.isdir(filepath):
1513 # Uploading a whole directory.
1514 item = None
1515 for item in _enqueue_dir(
1516 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001517 channel.send_result(item)
1518 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001519 # The very last item will be the .isolated file.
1520 if not item:
1521 # There was no file in the directory.
1522 continue
1523 elif fs.isfile(filepath):
1524 item = FileItem(
1525 path=filepath,
1526 algo=hash_algo,
1527 size=None,
1528 high_priority=f.endswith('.isolated'))
1529 channel.send_result(item)
1530 items_found.append(item)
1531 else:
1532 raise Error('%s is neither a file or directory.' % f)
1533 results[f] = item.digest
1534 except OSError:
1535 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001536 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001537 # Stops the generator, so _upload_items() can exit.
1538 channel.send_done()
1539 t.join()
1540
1541 cold = []
1542 hot = []
1543 for i in items_found:
1544 # Note that multiple FileItem may have the same .digest.
1545 if i.digest in uploaded_digests:
1546 cold.append(i)
1547 else:
1548 hot.append(i)
1549 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001550
1551
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001552@subcommand.usage('<file1..fileN> or - to read from stdin')
1553def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001554 """Archives data to the server.
1555
1556 If a directory is specified, a .isolated file is created the whole directory
1557 is uploaded. Then this .isolated file can be included in another one to run
1558 commands.
1559
1560 The commands output each file that was processed with its content hash. For
1561 directories, the .isolated generated for the directory is listed as the
1562 directory entry itself.
1563 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001564 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001565 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001566 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001567 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001568 server_ref = isolate_storage.ServerRef(
1569 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001570 if files == ['-']:
1571 files = (l.rstrip('\n\r') for l in sys.stdin)
1572 if not files:
1573 parser.error('Nothing to upload')
1574 files = (f.decode('utf-8') for f in files)
1575 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001576 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001577 with get_storage(server_ref) as storage:
1578 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001579 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001580 parser.error(e.args[0])
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001581 print('\n'.join('%s %s' % (h, f) for f, h in results.iteritems()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001582 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001583
1584
1585def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001586 """Download data from the server.
1587
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001588 It can either download individual files or a complete tree from a .isolated
1589 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001590 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001591 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001592 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001593 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001594 help='hash of an isolated file, .isolated file content is discarded, use '
1595 '--file if you need it')
1596 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001597 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1598 help='hash and destination of a file, can be used multiple times')
1599 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001600 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001601 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001602 parser.add_option(
1603 '--use-symlinks', action='store_true',
1604 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001605 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001606 options, args = parser.parse_args(args)
1607 if args:
1608 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001609 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001610 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001611
nodir55be77b2016-05-03 09:39:57 -07001612 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001613 if bool(options.isolated) == bool(options.file):
1614 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001615 if not options.cache and options.use_symlinks:
1616 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001617
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001618 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001619 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001620 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001621 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001622 if (fs.isfile(options.target) or
1623 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001624 parser.error(
1625 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001626 server_ref = isolate_storage.ServerRef(
1627 options.isolate_server, options.namespace)
1628 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001629 # Fetching individual files.
1630 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001631 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001632 channel = threading_utils.TaskChannel()
1633 pending = {}
1634 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001635 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001636 pending[digest] = dest
1637 storage.async_fetch(
1638 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001639 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001640 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001641 local_caching.UNKNOWN_FILE_SIZE,
1642 functools.partial(
1643 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001644 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001645 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001646 dest = pending.pop(fetched)
1647 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001648
Vadim Shtayura3172be52013-12-03 12:49:05 -08001649 # Fetching whole isolated tree.
1650 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001651 bundle = fetch_isolated(
1652 isolated_hash=options.isolated,
1653 storage=storage,
1654 cache=cache,
1655 outdir=options.target,
1656 use_symlinks=options.use_symlinks)
1657 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001658 if bundle.command:
1659 rel = os.path.join(options.target, bundle.relative_cwd)
1660 print('To run this test please run from the directory %s:' %
1661 os.path.join(options.target, rel))
1662 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001663
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001664 return 0
1665
1666
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001667def add_archive_options(parser):
1668 parser.add_option(
1669 '--blacklist',
1670 action='append', default=list(DEFAULT_BLACKLIST),
1671 help='List of regexp to use as blacklist filter when uploading '
1672 'directories')
1673
1674
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001675def add_isolate_server_options(parser):
1676 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001677 parser.add_option(
1678 '-I', '--isolate-server',
1679 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001680 help='URL of the Isolate Server to use. Defaults to the environment '
1681 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1682 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001683 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001684 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001685 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001686 '--namespace', default='default-gzip',
1687 help='The namespace to use on the Isolate Server, default: %default')
1688
1689
nodir55be77b2016-05-03 09:39:57 -07001690def process_isolate_server_options(
1691 parser, options, set_exception_handler, required):
1692 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001693
1694 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001695 """
1696 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001697 if required:
1698 parser.error('--isolate-server is required.')
1699 return
1700
aludwind7b7b7e2017-06-29 16:38:50 -07001701 if options.grpc_proxy:
1702 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001703 else:
1704 try:
1705 options.isolate_server = net.fix_url(options.isolate_server)
1706 except ValueError as e:
1707 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001708 if set_exception_handler:
1709 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001710 try:
1711 return auth.ensure_logged_in(options.isolate_server)
1712 except ValueError as e:
1713 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001714
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001715
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001716def add_cache_options(parser):
1717 cache_group = optparse.OptionGroup(parser, 'Cache management')
1718 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001719 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001720 help='Directory to keep a local cache of the files. Accelerates download '
1721 'by reusing already downloaded files. Default=%default')
1722 cache_group.add_option(
1723 '--max-cache-size',
1724 type='int',
1725 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001726 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001727 help='Trim if the cache gets larger than this value, default=%default')
1728 cache_group.add_option(
1729 '--min-free-space',
1730 type='int',
1731 metavar='NNN',
1732 default=2*1024*1024*1024,
1733 help='Trim if disk free space becomes lower than this value, '
1734 'default=%default')
1735 cache_group.add_option(
1736 '--max-items',
1737 type='int',
1738 metavar='NNN',
1739 default=100000,
1740 help='Trim if more than this number of items are in the cache '
1741 'default=%default')
1742 parser.add_option_group(cache_group)
1743
1744
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001745def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001746 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001747 policies = local_caching.CachePolicies(
1748 options.max_cache_size,
1749 options.min_free_space,
1750 options.max_items,
1751 # 3 weeks.
1752 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001753
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001754 # |options.cache| path may not exist until DiskContentAddressedCache()
1755 # instance is created.
1756 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel79d42192019-02-06 19:24:16 +00001757 unicode(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001758 else:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001759 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001760
1761
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001762class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001763 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001764 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001765 self,
1766 version=__version__,
1767 prog=os.path.basename(sys.modules[__name__].__file__),
1768 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001769 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001770
1771 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001772 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001773 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001774 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001775 return options, args
1776
1777
1778def main(args):
1779 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001780 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001781
1782
1783if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001784 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001785 fix_encoding.fix_encoding()
1786 tools.disable_buffering()
1787 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001788 sys.exit(main(sys.argv[1:]))