blob: 6a49e689dc9da8174973141d9b9eb8a5a1e724c7 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000016import Queue
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
tansell26de79e2016-11-13 18:41:11 -080021import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050022import tempfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000023import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000025import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000026
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027from third_party import colorama
28from third_party.depot_tools import fix_encoding
29from third_party.depot_tools import subcommand
30
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050031from utils import file_path
maruel12e30012015-10-09 11:55:35 -070032from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040033from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
aludwin81178302016-11-30 17:18:49 -080042import isolate_storage
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -040043import local_caching
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# Maximum expected delay (in seconds) between successive file fetches or uploads
51# in Storage. If it takes longer than that, a deadlock might be happening
52# and all stack frames for all threads are dumped to log.
53DEADLOCK_TIMEOUT = 5 * 60
54
55
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000056# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000057# All files are sorted by likelihood of a change in the file content
58# (currently file size is used to estimate this: larger the file -> larger the
59# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# and so on. Numbers here is a trade-off; the more per request, the lower the
62# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
63# larger values cause longer lookups, increasing the initial latency to start
64# uploading, which is especially an issue for large files. This value is
65# optimized for the "few thousands files to look up with minimal number of large
66# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040067ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000068
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000069
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000070# A list of already compressed extension types that should not receive any
71# compression before being uploaded.
72ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040073 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
74 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075]
76
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000077
maruel@chromium.org41601642013-09-18 19:40:46 +000078# The delay (in seconds) to wait between logging statements when retrieving
79# the required files. This is intended to let the user (or buildbot) know that
80# the program is still running.
81DELAY_BETWEEN_UPDATES_IN_SECS = 30
82
83
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050084DEFAULT_BLACKLIST = (
85 # Temporary vim or python files.
86 r'^.+\.(?:pyc|swp)$',
87 # .git or .svn directory.
88 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
89)
90
91
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050092class Error(Exception):
93 """Generic runtime error."""
94 pass
95
96
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040097class Aborted(Error):
98 """Operation aborted."""
99 pass
100
101
nodir90bc8dc2016-06-15 13:35:21 -0700102class AlreadyExists(Error):
103 """File already exists."""
104
105
maruel12e30012015-10-09 11:55:35 -0700106def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800107 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700108 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800109 if offset:
110 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000111 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000112 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000113 if not data:
114 break
115 yield data
116
117
tansell9e04a8d2016-07-28 09:31:59 -0700118def fileobj_path(fileobj):
119 """Return file system path for file like object or None.
120
121 The returned path is guaranteed to exist and can be passed to file system
122 operations like copy.
123 """
124 name = getattr(fileobj, 'name', None)
125 if name is None:
126 return
127
128 # If the file like object was created using something like open("test.txt")
129 # name will end up being a str (such as a function outside our control, like
130 # the standard library). We want all our paths to be unicode objects, so we
131 # decode it.
132 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500133 # We incorrectly assume that UTF-8 is used everywhere.
134 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700135
tansell26de79e2016-11-13 18:41:11 -0800136 # fs.exists requires an absolute path, otherwise it will fail with an
137 # assertion error.
138 if not os.path.isabs(name):
139 return
140
tansell9e04a8d2016-07-28 09:31:59 -0700141 if fs.exists(name):
142 return name
143
144
145# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
146# wrappers have been created.
147def fileobj_copy(
148 dstfileobj, srcfileobj, size=-1,
149 chunk_size=isolated_format.DISK_FILE_CHUNK):
150 """Copy data from srcfileobj to dstfileobj.
151
152 Providing size means exactly that amount of data will be copied (if there
153 isn't enough data, an IOError exception is thrown). Otherwise all data until
154 the EOF marker will be copied.
155 """
156 if size == -1 and hasattr(srcfileobj, 'tell'):
157 if srcfileobj.tell() != 0:
158 raise IOError('partial file but not using size')
159
160 written = 0
161 while written != size:
162 readsize = chunk_size
163 if size > 0:
164 readsize = min(readsize, size-written)
165 data = srcfileobj.read(readsize)
166 if not data:
167 if size == -1:
168 break
169 raise IOError('partial file, got %s, wanted %s' % (written, size))
170 dstfileobj.write(data)
171 written += len(data)
172
173
174def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
175 """Put srcfileobj at the given dstpath with given mode.
176
177 The function aims to do this as efficiently as possible while still allowing
178 any possible file like object be given.
179
180 Creating a tree of hardlinks has a few drawbacks:
181 - tmpfs cannot be used for the scratch space. The tree has to be on the same
182 partition as the cache.
183 - involves a write to the inode, which advances ctime, cause a metadata
184 writeback (causing disk seeking).
185 - cache ctime cannot be used to detect modifications / corruption.
186 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
187 partition. This is why the function automatically fallbacks to copying the
188 file content.
189 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
190 same owner is for all hardlinks.
191 - Anecdotal report that ext2 is known to be potentially faulty on high rate
192 of hardlink creation.
193
194 Creating a tree of symlinks has a few drawbacks:
195 - Tasks running the equivalent of os.path.realpath() will get the naked path
196 and may fail.
197 - Windows:
198 - Symlinks are reparse points:
199 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
200 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
201 - Symbolic links are Win32 paths, not NT paths.
202 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
203 - Symbolic links are supported on Windows 7 and later only.
204 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
205 default.
206 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
207 RID is present in the token;
208 https://msdn.microsoft.com/en-us/library/bb530410.aspx
209 """
210 srcpath = fileobj_path(srcfileobj)
211 if srcpath and size == -1:
212 readonly = file_mode is None or (
213 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
214
215 if readonly:
216 # If the file is read only we can link the file
217 if use_symlink:
218 link_mode = file_path.SYMLINK_WITH_FALLBACK
219 else:
220 link_mode = file_path.HARDLINK_WITH_FALLBACK
221 else:
222 # If not read only, we must copy the file
223 link_mode = file_path.COPY
224
225 file_path.link_file(dstpath, srcpath, link_mode)
226 else:
227 # Need to write out the file
228 with fs.open(dstpath, 'wb') as dstfileobj:
229 fileobj_copy(dstfileobj, srcfileobj, size)
230
231 assert fs.exists(dstpath)
232
233 # file_mode of 0 is actually valid, so need explicit check.
234 if file_mode is not None:
235 fs.chmod(dstpath, file_mode)
236
237
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000238def zip_compress(content_generator, level=7):
239 """Reads chunks from |content_generator| and yields zip compressed chunks."""
240 compressor = zlib.compressobj(level)
241 for chunk in content_generator:
242 compressed = compressor.compress(chunk)
243 if compressed:
244 yield compressed
245 tail = compressor.flush(zlib.Z_FINISH)
246 if tail:
247 yield tail
248
249
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400250def zip_decompress(
251 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000252 """Reads zipped data from |content_generator| and yields decompressed data.
253
254 Decompresses data in small chunks (no larger than |chunk_size|) so that
255 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
256
257 Raises IOError if data is corrupted or incomplete.
258 """
259 decompressor = zlib.decompressobj()
260 compressed_size = 0
261 try:
262 for chunk in content_generator:
263 compressed_size += len(chunk)
264 data = decompressor.decompress(chunk, chunk_size)
265 if data:
266 yield data
267 while decompressor.unconsumed_tail:
268 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
269 if data:
270 yield data
271 tail = decompressor.flush()
272 if tail:
273 yield tail
274 except zlib.error as e:
275 raise IOError(
276 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
277 # Ensure all data was read and decompressed.
278 if decompressor.unused_data or decompressor.unconsumed_tail:
279 raise IOError('Not all data was decompressed')
280
281
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000282def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000283 """Given a filename calculates the ideal zip compression level to use."""
284 file_ext = os.path.splitext(filename)[1].lower()
285 # TODO(csharp): Profile to find what compression level works best.
286 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
287
288
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000289def create_directories(base_directory, files):
290 """Creates the directory structure needed by the given list of files."""
291 logging.debug('create_directories(%s, %d)', base_directory, len(files))
292 # Creates the tree of directories to create.
293 directories = set(os.path.dirname(f) for f in files)
294 for item in list(directories):
295 while item:
296 directories.add(item)
297 item = os.path.dirname(item)
298 for d in sorted(directories):
299 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700300 abs_d = os.path.join(base_directory, d)
301 if not fs.isdir(abs_d):
302 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000303
304
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000305def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500306 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000307 for filepath, properties in files:
308 if 'l' not in properties:
309 continue
310 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500311 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000312 logging.warning('Ignoring symlink %s', filepath)
313 continue
314 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700315 try:
316 os.symlink(properties['l'], outfile) # pylint: disable=E1101
317 except OSError as e:
318 if e.errno == errno.EEXIST:
319 raise AlreadyExists('File %s already exists.' % outfile)
320 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000321
322
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000323class _ThreadFile(object):
324 """Multithreaded fake file. Used by TarBundle."""
325 def __init__(self):
326 self._data = threading_utils.TaskChannel()
327 self._offset = 0
328
329 def __iter__(self):
330 return self._data
331
332 def tell(self):
333 return self._offset
334
335 def write(self, b):
336 self._data.send_result(b)
337 self._offset += len(b)
338
339 def close(self):
340 self._data.send_done()
341
342
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400343class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800344 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000345
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800346 Its digest and size may be provided in advance, if known. Otherwise they will
347 be derived from the file content.
348 """
349
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000350 def __init__(self, path, algo, digest=None, size=None, high_priority=False):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800351 super(FileItem, self).__init__(
352 digest,
maruel12e30012015-10-09 11:55:35 -0700353 size if size is not None else fs.stat(path).st_size,
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000354 high_priority,
355 compression_level=_get_zip_compression_level(path))
356 self._path = path
357 self._algo = algo
358 self._meta = None
359
360 @property
361 def path(self):
362 return self._path
363
364 @property
365 def digest(self):
366 if not self._digest:
367 self._digest = isolated_format.hash_file(self._path, self._algo)
368 return self._digest
369
370 @property
371 def meta(self):
372 if not self._meta:
373 # TODO(maruel): Inline.
374 self._meta = isolated_format.file_to_metadata(self.path, 0, False)
375 # We need to hash right away.
376 self._meta['h'] = self.digest
377 return self._meta
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000378
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800379 def content(self):
380 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000381
382
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +0000383class TarBundle(isolate_storage.Item):
384 """Tarfile to push to Storage.
385
386 Its digest is the digest of all the files it contains. It is generated on the
387 fly.
388 """
389
390 def __init__(self, root, algo):
391 # 2 trailing 512 bytes headers.
392 super(TarBundle, self).__init__(size=1024)
393 self._items = []
394 self._meta = None
395 self._algo = algo
396 self._root_len = len(root) + 1
397 # Same value as for Go.
398 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/tar_archiver.go
399 # https://chromium.googlesource.com/infra/luci/luci-go.git/+/master/client/archiver/upload_tracker.go
400 self._archive_max_size = int(10e6)
401
402 @property
403 def digest(self):
404 if not self._digest:
405 self._prepare()
406 return self._digest
407
408 @property
409 def size(self):
410 if self._size is None:
411 self._prepare()
412 return self._size
413
414 def try_add(self, item):
415 """Try to add this file to the bundle.
416
417 It is extremely naive but this should be just enough for
418 https://crbug.com/825418.
419
420 Future improvements should be in the Go code, and the Swarming bot should be
421 migrated to use the Go code instead.
422 """
423 if not item.size:
424 return False
425 # pylint: disable=unreachable
426 rounded = (item.size + 512) & ~511
427 if rounded + self._size > self._archive_max_size:
428 return False
429 # https://crbug.com/825418
430 return False
431 self._size += rounded
432 self._items.append(item)
433 return True
434
435 def yield_item_path_meta(self):
436 """Returns a tuple(Item, filepath, meta_dict).
437
438 If the bundle contains less than 5 items, the items are yielded.
439 """
440 if len(self._items) < 5:
441 # The tarball is too small, yield individual items, if any.
442 for item in self._items:
443 yield item, item.path[self._root_len:], item.meta
444 else:
445 # This ensures self._meta is set.
446 p = self.digest + '.tar'
447 # Yield itself as a tarball.
448 yield self, p, self._meta
449
450 def content(self):
451 """Generates the tarfile content on the fly."""
452 obj = _ThreadFile()
453 def _tar_thread():
454 try:
455 t = tarfile.open(
456 fileobj=obj, mode='w', format=tarfile.PAX_FORMAT, encoding='utf-8')
457 for item in self._items:
458 logging.info(' tarring %s', item.path)
459 t.add(item.path)
460 t.close()
461 except Exception:
462 logging.exception('Internal failure')
463 finally:
464 obj.close()
465
466 t = threading.Thread(target=_tar_thread)
467 t.start()
468 try:
469 for data in obj:
470 yield data
471 finally:
472 t.join()
473
474 def _prepare(self):
475 h = self._algo()
476 total = 0
477 for chunk in self.content():
478 h.update(chunk)
479 total += len(chunk)
480 # pylint: disable=attribute-defined-outside-init
481 # This is not true, they are defined in Item.__init__().
482 self._digest = h.hexdigest()
483 self._size = total
484 self._meta = {
485 'h': self.digest,
486 's': self.size,
487 't': u'tar',
488 }
489
490
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400491class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000492 """A byte buffer to push to Storage."""
493
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000494 def __init__(self, buf, algo, high_priority=False):
495 super(BufferItem, self).__init__(
496 digest=algo(buf).hexdigest(),
497 size=len(buf),
498 high_priority=high_priority)
499 self._buffer = buf
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000500
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800501 def content(self):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +0000502 return [self._buffer]
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000503
504
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000505class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800506 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000507
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800508 Implements compression support, parallel 'contains' checks, parallel uploads
509 and more.
510
511 Works only within single namespace (and thus hashing algorithm and compression
512 scheme are fixed).
513
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400514 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
515 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 """
517
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700518 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000519 self._storage_api = storage_api
520 self._cpu_thread_pool = None
521 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400522 self._aborted = False
523 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000524
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000525 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000526 def server_ref(self):
527 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700528
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000529 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700530 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000531 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700532
533 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000534 def cpu_thread_pool(self):
535 """ThreadPool for CPU-bound tasks like zipping."""
536 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500537 threads = max(threading_utils.num_processors(), 2)
538 if sys.maxsize <= 2L**32:
539 # On 32 bits userland, do not try to use more than 16 threads.
540 threads = min(threads, 16)
541 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000542 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000543
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 @property
545 def net_thread_pool(self):
546 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
547 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700548 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000549 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000550
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000551 def close(self):
552 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400553 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 if self._cpu_thread_pool:
555 self._cpu_thread_pool.join()
556 self._cpu_thread_pool.close()
557 self._cpu_thread_pool = None
558 if self._net_thread_pool:
559 self._net_thread_pool.join()
560 self._net_thread_pool.close()
561 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400562 logging.info('Done.')
563
564 def abort(self):
565 """Cancels any pending or future operations."""
566 # This is not strictly theadsafe, but in the worst case the logging message
567 # will be printed twice. Not a big deal. In other places it is assumed that
568 # unprotected reads and writes to _aborted are serializable (it is true
569 # for python) and thus no locking is used.
570 if not self._aborted:
571 logging.warning('Aborting... It can take a while.')
572 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000573
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000574 def __enter__(self):
575 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400576 assert not self._prev_sig_handlers, self._prev_sig_handlers
577 for s in (signal.SIGINT, signal.SIGTERM):
578 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000579 return self
580
581 def __exit__(self, _exc_type, _exc_value, _traceback):
582 """Context manager interface."""
583 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400584 while self._prev_sig_handlers:
585 s, h = self._prev_sig_handlers.popitem()
586 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000587 return False
588
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000589 def upload_items(self, items):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000590 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000591
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800592 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000593
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000594 It uses 3 threads internally:
595 - One to create batches based on a timeout
596 - One to dispatch the /contains RPC and field the missing entries
597 - One to field the /push RPC
598
599 The main threads enumerates 'items' and pushes to the first thread. Then it
600 join() all the threads, waiting for them to complete.
601
602 (enumerate items of Item, this can be slow as disk is traversed)
603 |
604 v
605 _create_items_batches_thread Thread #1
606 (generates list(Item), every 3s or 20~100 items)
607 |
608 v
609 _do_lookups_thread Thread #2
610 | |
611 v v
612 (missing) (was on server)
613 |
614 v
615 _handle_missing_thread Thread #3
616 |
617 v
618 (upload Item, append to uploaded)
619
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000620 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400621 items: list of isolate_storage.Item instances that represents data to
622 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000623
624 Returns:
625 List of items that were uploaded. All other items are already there.
626 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000627 incoming = Queue.Queue()
628 batches_to_lookup = Queue.Queue()
629 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000630 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000632 def _create_items_batches_thread():
633 """Creates batches for /contains RPC lookup from individual items.
634
635 Input: incoming
636 Output: batches_to_lookup
637 """
638 try:
639 batch_size_index = 0
640 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
641 batch = []
642 while not self._aborted:
643 try:
644 item = incoming.get(True, timeout=3)
645 if item:
646 batch.append(item)
647 except Queue.Empty:
648 item = False
649 if len(batch) == batch_size or (not item and batch):
650 if len(batch) == batch_size:
651 batch_size_index += 1
652 batch_size = ITEMS_PER_CONTAINS_QUERIES[
653 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
654 batches_to_lookup.put(batch)
655 batch = []
656 if item is None:
657 break
658 finally:
659 # Unblock the next pipeline.
660 batches_to_lookup.put(None)
661
662 def _do_lookups_thread():
663 """Enqueues all the /contains RPCs and emits the missing items.
664
665 Input: batches_to_lookup
666 Output: missing, to_upload
667 """
668 try:
669 channel = threading_utils.TaskChannel()
670 def _contains(b):
671 if self._aborted:
672 raise Aborted()
673 return self._storage_api.contains(b)
674
675 pending_contains = 0
676 while not self._aborted:
677 batch = batches_to_lookup.get()
678 if batch is None:
679 break
680 self.net_thread_pool.add_task_with_channel(
681 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
682 pending_contains += 1
683 while pending_contains and not self._aborted:
684 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000685 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000686 except threading_utils.TaskChannel.Timeout:
687 break
688 pending_contains -= 1
689 for missing_item, push_state in v.iteritems():
690 missing.put((missing_item, push_state))
691 while pending_contains and not self._aborted:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000692 for missing_item, push_state in channel.next().iteritems():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000693 missing.put((missing_item, push_state))
694 pending_contains -= 1
695 finally:
696 # Unblock the next pipeline.
697 missing.put((None, None))
698
699 def _handle_missing_thread():
700 """Sends the missing items to the uploader.
701
702 Input: missing
703 Output: uploaded
704 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700705 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000706 channel = threading_utils.TaskChannel()
707 pending_upload = 0
708 while not self._aborted:
709 try:
710 missing_item, push_state = missing.get(True, timeout=5)
711 if missing_item is None:
712 break
713 self._async_push(channel, missing_item, push_state)
714 pending_upload += 1
715 except Queue.Empty:
716 pass
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000717 detector.ping()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000718 while not self._aborted and pending_upload:
719 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000720 item = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000721 except threading_utils.TaskChannel.Timeout:
722 break
723 uploaded.append(item)
724 pending_upload -= 1
725 logging.debug(
726 'Uploaded %d; %d pending: %s (%d)',
727 len(uploaded), pending_upload, item.digest, item.size)
728 while not self._aborted and pending_upload:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000729 item = channel.next()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000730 uploaded.append(item)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000731 pending_upload -= 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000732 logging.debug(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000733 'Uploaded %d; %d pending: %s (%d)',
734 len(uploaded), pending_upload, item.digest, item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000735
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000736 threads = [
737 threading.Thread(target=_create_items_batches_thread),
738 threading.Thread(target=_do_lookups_thread),
739 threading.Thread(target=_handle_missing_thread),
740 ]
741 for t in threads:
742 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000743
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000744 try:
745 # For each digest keep only first isolate_storage.Item that matches it.
746 # All other items are just indistinguishable copies from the point of view
747 # of isolate server (it doesn't care about paths at all, only content and
748 # digests).
749 seen = {}
750 try:
751 # TODO(maruel): Reorder the items as a priority queue, with larger items
752 # being processed first. This is, before hashing the data.
753 # This must be done in the primary thread since items can be a
754 # generator.
755 for item in items:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000756 if seen.setdefault(item.digest, item) is item:
757 incoming.put(item)
758 finally:
759 incoming.put(None)
760 finally:
761 for t in threads:
762 t.join()
763
764 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000765 if seen:
766 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000767 return uploaded
768
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000769 def _async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000770 """Starts asynchronous push to the server in a parallel thread.
771
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000772 Can be used only after |item| was checked for presence on a server with a
773 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800774
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000775 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000776 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400777 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000778 push_state: push state returned by storage_api.contains(). It contains
779 storage specific information describing how to upload the item (for
780 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800781
782 Returns:
783 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000784 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800785 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400786 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700787 threading_utils.PRIORITY_HIGH if item.high_priority
788 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800789
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000790 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400791 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400792 if self._aborted:
793 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800794 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000795 return item
796
Wei Huang1a38fbe2017-11-28 22:55:22 -0500797 # If zipping is not required, just start a push task. Don't pass 'content'
798 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000799 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000800 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000801 return
802
803 # If zipping is enabled, zip in a separate thread.
804 def zip_and_push():
805 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
806 # content right here. It will block until all file is zipped.
807 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400808 if self._aborted:
809 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800810 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000811 data = ''.join(stream)
812 except Exception as exc:
813 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800814 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000815 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500816 # Pass '[data]' explicitly because the compressed data is not same as the
817 # one provided by 'item'. Since '[data]' is a list, it can safely be
818 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000819 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000820 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000821 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000822
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800823 def push(self, item, push_state):
824 """Synchronously pushes a single item to the server.
825
826 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000827 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800828
829 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400830 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000831 push_state: push state returned by storage_api.contains(). It contains
832 storage specific information describing how to upload the item (for
833 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800834
835 Returns:
836 Pushed item (same object as |item|).
837 """
838 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700839 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000840 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000841 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800842 assert pushed is item
843 return item
844
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000845 def async_fetch(self, channel, priority, digest, size, sink):
846 """Starts asynchronous fetch from the server in a parallel thread.
847
848 Arguments:
849 channel: TaskChannel that receives back |digest| when download ends.
850 priority: thread pool task priority for the fetch.
851 digest: hex digest of an item to download.
852 size: expected size of the item (after decompression).
853 sink: function that will be called as sink(generator).
854 """
855 def fetch():
856 try:
857 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400858 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000859 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400860 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000861 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000862 verifier = FetchStreamVerifier(
863 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000864 # Verified stream goes to |sink|.
865 sink(verifier.run())
866 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800867 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000868 raise
869 return digest
870
871 # Don't bother with zip_thread_pool for decompression. Decompression is
872 # really fast and most probably IO bound anyway.
873 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
874
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000875
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000876class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400877 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000878
879 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400880 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
881 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000882 """
883
884 def __init__(self, storage, cache):
885 self.storage = storage
886 self.cache = cache
887 self._channel = threading_utils.TaskChannel()
888 self._pending = set()
889 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000890 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400891 # Pending digests that the caller waits for, see wait_on()/wait().
892 self._waiting_on = set()
893 # Already fetched digests the caller waits for which are not yet returned by
894 # wait().
895 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000896
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400897 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700898 self,
899 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400900 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700901 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000902 """Starts asynchronous fetch of item |digest|."""
903 # Fetching it now?
904 if digest in self._pending:
905 return
906
907 # Mark this file as in use, verify_all_cached will later ensure it is still
908 # in cache.
909 self._accessed.add(digest)
910
911 # Already fetched? Notify cache to update item's LRU position.
912 if digest in self._fetched:
913 # 'touch' returns True if item is in cache and not corrupted.
914 if self.cache.touch(digest, size):
915 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000916 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000917 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000918
919 # TODO(maruel): It should look at the free disk space, the current cache
920 # size and the size of the new item on every new item:
921 # - Trim the cache as more entries are listed when free disk space is low,
922 # otherwise if the amount of data downloaded during the run > free disk
923 # space, it'll crash.
924 # - Make sure there's enough free disk space to fit all dependencies of
925 # this run! If not, abort early.
926
927 # Start fetching.
928 self._pending.add(digest)
929 self.storage.async_fetch(
930 self._channel, priority, digest, size,
931 functools.partial(self.cache.write, digest))
932
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400933 def wait_on(self, digest):
934 """Updates digests to be waited on by 'wait'."""
935 # Calculate once the already fetched items. These will be retrieved first.
936 if digest in self._fetched:
937 self._waiting_on_ready.add(digest)
938 else:
939 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000940
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400941 def wait(self):
942 """Waits until any of waited-on items is retrieved.
943
944 Once this happens, it is remove from the waited-on set and returned.
945
946 This function is called in two waves. The first wave it is done for HIGH
947 priority items, the isolated files themselves. The second wave it is called
948 for all the files.
949
950 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000951 """
952 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400953 if self._waiting_on_ready:
954 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000955
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400956 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000957
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400958 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000959 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000960 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000961 self._pending.remove(digest)
962 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400963 if digest in self._waiting_on:
964 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000965 return digest
966
967 # Should never reach this point due to assert above.
968 raise RuntimeError('Impossible state')
969
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400970 @property
971 def wait_queue_empty(self):
972 """Returns True if there is no digest left for wait() to return."""
973 return not self._waiting_on and not self._waiting_on_ready
974
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000975 def inject_local_file(self, path, algo):
976 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700977 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000978 data = f.read()
979 digest = algo(data).hexdigest()
980 self.cache.write(digest, [data])
981 self._fetched.add(digest)
982 return digest
983
984 @property
985 def pending_count(self):
986 """Returns number of items to be fetched."""
987 return len(self._pending)
988
989 def verify_all_cached(self):
990 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000991 # Not thread safe, but called after all work is done.
992 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000993
994
995class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400996 """Verifies that fetched file is valid before passing it to the
997 ContentAddressedCache.
998 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000999
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001000 def __init__(self, stream, hasher, expected_digest, expected_size):
1001 """Initializes the verifier.
1002
1003 Arguments:
1004 * stream: an iterable yielding chunks of content
1005 * hasher: an object from hashlib that supports update() and hexdigest()
1006 (eg, hashlib.sha1).
1007 * expected_digest: if the entire stream is piped through hasher and then
1008 summarized via hexdigest(), this should be the result. That is, it
1009 should be a hex string like 'abc123'.
1010 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001011 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001012 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -04001013 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001014 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001015 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001016 self.expected_size = expected_size
1017 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001018 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001019
1020 def run(self):
1021 """Generator that yields same items as |stream|.
1022
1023 Verifies |stream| is complete before yielding a last chunk to consumer.
1024
1025 Also wraps IOError produced by consumer into MappingError exceptions since
1026 otherwise Storage will retry fetch on unrelated local cache errors.
1027 """
1028 # Read one chunk ahead, keep it in |stored|.
1029 # That way a complete stream can be verified before pushing last chunk
1030 # to consumer.
1031 stored = None
1032 for chunk in self.stream:
1033 assert chunk is not None
1034 if stored is not None:
1035 self._inspect_chunk(stored, is_last=False)
1036 try:
1037 yield stored
1038 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001039 raise isolated_format.MappingError(
1040 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001041 stored = chunk
1042 if stored is not None:
1043 self._inspect_chunk(stored, is_last=True)
1044 try:
1045 yield stored
1046 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001047 raise isolated_format.MappingError(
1048 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001049
1050 def _inspect_chunk(self, chunk, is_last):
1051 """Called for each fetched chunk before passing it to consumer."""
1052 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001053 self.rolling_hash.update(chunk)
1054 if not is_last:
1055 return
1056
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001057 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001058 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001059 msg = 'Incorrect file size: want %d, got %d' % (
1060 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -04001061 raise IOError(msg)
1062
1063 actual_digest = self.rolling_hash.hexdigest()
1064 if self.expected_digest != actual_digest:
1065 msg = 'Incorrect digest: want %s, got %s' % (
1066 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -04001067 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001068
1069
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001070class IsolatedBundle(object):
1071 """Fetched and parsed .isolated file with all dependencies."""
1072
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001073 def __init__(self, filter_cb):
1074 """
1075 filter_cb: callback function to filter downloaded content.
1076 When filter_cb is not None, Isolated file is downloaded iff
1077 filter_cb(filepath) returns True.
1078 """
1079
Vadim Shtayura3148e072014-09-02 18:51:52 -07001080 self.command = []
1081 self.files = {}
1082 self.read_only = None
1083 self.relative_cwd = None
1084 # The main .isolated file, a IsolatedFile instance.
1085 self.root = None
1086
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001087 self._filter_cb = filter_cb
1088
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001089 def fetch(self, fetch_queue, root_isolated_hash, algo):
1090 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001091
1092 It enables support for "included" .isolated files. They are processed in
1093 strict order but fetched asynchronously from the cache. This is important so
1094 that a file in an included .isolated file that is overridden by an embedding
1095 .isolated file is not fetched needlessly. The includes are fetched in one
1096 pass and the files are fetched as soon as all the ones on the left-side
1097 of the tree were fetched.
1098
1099 The prioritization is very important here for nested .isolated files.
1100 'includes' have the highest priority and the algorithm is optimized for both
1101 deep and wide trees. A deep one is a long link of .isolated files referenced
1102 one at a time by one item in 'includes'. A wide one has a large number of
1103 'includes' in a single .isolated file. 'left' is defined as an included
1104 .isolated file earlier in the 'includes' list. So the order of the elements
1105 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001106
1107 As a side effect this method starts asynchronous fetch of all data files
1108 by adding them to |fetch_queue|. It doesn't wait for data files to finish
1109 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001110 """
1111 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
1112
1113 # Isolated files being retrieved now: hash -> IsolatedFile instance.
1114 pending = {}
1115 # Set of hashes of already retrieved items to refuse recursive includes.
1116 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001117 # Set of IsolatedFile's whose data files have already being fetched.
1118 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001119
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001120 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001121 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -07001122 h = isolated_file.obj_hash
1123 if h in seen:
1124 raise isolated_format.IsolatedError(
1125 'IsolatedFile %s is retrieved recursively' % h)
1126 assert h not in pending
1127 seen.add(h)
1128 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001129 # This isolated item is being added dynamically, notify FetchQueue.
1130 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001131 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
1132
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001133 # Start fetching root *.isolated file (single file, not the whole bundle).
1134 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001135
1136 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001137 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001138 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -07001139 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -07001140 with fetch_queue.cache.getfileobj(item_hash) as f:
1141 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -07001142
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001143 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001144 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001145 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001146
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001147 # Always fetch *.isolated files in traversal order, waiting if necessary
1148 # until next to-be-processed node loads. "Waiting" is done by yielding
1149 # back to the outer loop, that waits until some *.isolated is loaded.
1150 for node in isolated_format.walk_includes(self.root):
1151 if node not in processed:
1152 # Not visited, and not yet loaded -> wait for it to load.
1153 if not node.is_loaded:
1154 break
1155 # Not visited and loaded -> process it and continue the traversal.
1156 self._start_fetching_files(node, fetch_queue)
1157 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001158
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001159 # All *.isolated files should be processed by now and only them.
1160 all_isolateds = set(isolated_format.walk_includes(self.root))
1161 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001162 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001163
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001164 # Extract 'command' and other bundle properties.
1165 for node in isolated_format.walk_includes(self.root):
1166 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001167 self.relative_cwd = self.relative_cwd or ''
1168
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001169 def _start_fetching_files(self, isolated, fetch_queue):
1170 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001171
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001172 Modifies self.files.
1173 """
maruel10bea7b2016-12-07 05:03:49 -08001174 files = isolated.data.get('files', {})
1175 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1176 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001177 if self._filter_cb and not self._filter_cb(filepath):
1178 continue
1179
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001180 # Root isolated has priority on the files being mapped. In particular,
1181 # overridden files must not be fetched.
1182 if filepath not in self.files:
1183 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001184
1185 # Make sure if the isolated is read only, the mode doesn't have write
1186 # bits.
1187 if 'm' in properties and self.read_only:
1188 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1189
1190 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001191 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001192 fetch_queue.add(
1193 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1194
1195 def _update_self(self, node):
1196 """Extracts bundle global parameters from loaded *.isolated file.
1197
1198 Will be called with each loaded *.isolated file in order of traversal of
1199 isolated include graph (see isolated_format.walk_includes).
1200 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001201 # Grabs properties.
1202 if not self.command and node.data.get('command'):
1203 # Ensure paths are correctly separated on windows.
1204 self.command = node.data['command']
1205 if self.command:
1206 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001207 if self.read_only is None and node.data.get('read_only') is not None:
1208 self.read_only = node.data['read_only']
1209 if (self.relative_cwd is None and
1210 node.data.get('relative_cwd') is not None):
1211 self.relative_cwd = node.data['relative_cwd']
1212
1213
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001214def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001215 """Returns Storage class that can upload and download from |namespace|.
1216
1217 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001218 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001219
1220 Returns:
1221 Instance of Storage.
1222 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001223 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1224 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001225
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001226
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001227def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1228 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001229 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001230
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001231 Arguments:
1232 isolated_hash: hash of the root *.isolated file.
1233 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001234 cache: ContentAddressedCache class that knows how to store and map files
1235 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001236 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001237 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001238 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001239
1240 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001241 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001242 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001243 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001244 'fetch_isolated(%s, %s, %s, %s, %s)',
1245 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001246 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001247 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001248 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001249 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001250
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001251 with tools.Profiler('GetIsolateds'):
1252 # Optionally support local files by manually adding them to cache.
1253 if not isolated_format.is_valid_hash(isolated_hash, algo):
1254 logging.debug('%s is not a valid hash, assuming a file '
1255 '(algo was %s, hash size was %d)',
1256 isolated_hash, algo(), algo().digest_size)
1257 path = unicode(os.path.abspath(isolated_hash))
1258 try:
1259 isolated_hash = fetch_queue.inject_local_file(path, algo)
1260 except IOError as e:
1261 raise isolated_format.MappingError(
1262 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1263 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001264
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001265 # Load all *.isolated and start loading rest of the files.
1266 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001267
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001268 with tools.Profiler('GetRest'):
1269 # Create file system hierarchy.
1270 file_path.ensure_tree(outdir)
1271 create_directories(outdir, bundle.files)
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +00001272 _create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001273
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001274 # Ensure working directory exists.
1275 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1276 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001277
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001278 # Multimap: digest -> list of pairs (path, props).
1279 remaining = {}
1280 for filepath, props in bundle.files.iteritems():
1281 if 'h' in props:
1282 remaining.setdefault(props['h'], []).append((filepath, props))
1283 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001284
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001285 # Now block on the remaining files to be downloaded and mapped.
1286 logging.info('Retrieving remaining files (%d of them)...',
1287 fetch_queue.pending_count)
1288 last_update = time.time()
1289 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1290 while remaining:
1291 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001292
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001293 # Wait for any item to finish fetching to cache.
1294 digest = fetch_queue.wait()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001295
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001296 # Create the files in the destination using item in cache as the
1297 # source.
1298 for filepath, props in remaining.pop(digest):
1299 fullpath = os.path.join(outdir, filepath)
tansell9e04a8d2016-07-28 09:31:59 -07001300
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001301 with cache.getfileobj(digest) as srcfileobj:
1302 filetype = props.get('t', 'basic')
tanselle4288c32016-07-28 09:45:40 -07001303
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001304 if filetype == 'basic':
1305 # Ignore all bits apart from the user.
1306 file_mode = (props.get('m') or 0500) & 0700
1307 if bundle.read_only:
1308 # Enforce read-only if the root bundle does.
1309 file_mode &= 0500
1310 putfile(
1311 srcfileobj, fullpath, file_mode,
1312 use_symlink=use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001313
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001314 elif filetype == 'tar':
1315 basedir = os.path.dirname(fullpath)
1316 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1317 for ti in t:
1318 if not ti.isfile():
1319 logging.warning(
1320 'Path(%r) is nonfile (%s), skipped',
1321 ti.name, ti.type)
1322 continue
1323 # Handle files created on Windows fetched on POSIX and the
1324 # reverse.
1325 other_sep = '/' if os.path.sep == '\\' else '\\'
1326 name = ti.name.replace(other_sep, os.path.sep)
1327 fp = os.path.normpath(os.path.join(basedir, name))
1328 if not fp.startswith(basedir):
1329 logging.error(
1330 'Path(%r) is outside root directory',
1331 fp)
1332 ifd = t.extractfile(ti)
1333 file_path.ensure_tree(os.path.dirname(fp))
1334 file_mode = ti.mode & 0700
1335 if bundle.read_only:
1336 # Enforce read-only if the root bundle does.
1337 file_mode &= 0500
1338 putfile(ifd, fp, file_mode, ti.size)
tansell26de79e2016-11-13 18:41:11 -08001339
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001340 else:
1341 raise isolated_format.IsolatedError(
1342 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001343
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001344 # Report progress.
1345 duration = time.time() - last_update
1346 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1347 msg = '%d files remaining...' % len(remaining)
1348 sys.stdout.write(msg + '\n')
1349 sys.stdout.flush()
1350 logging.info(msg)
1351 last_update = time.time()
1352 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001353
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001354 # Save the cache right away to not loose the state of the new objects.
1355 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001356 # Cache could evict some items we just tried to fetch, it's a fatal error.
1357 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001358 free_disk = file_path.get_free_space(cache.cache_dir)
1359 msg = (
1360 'Cache is too small to hold all requested files.\n'
1361 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001362 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001363 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001364 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001365
1366
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001367def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001368 """Yields every file and/or symlink found.
1369
1370 Yields:
1371 tuple(FileItem, relpath, metadata)
1372 For a symlink, FileItem is None.
1373 """
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001374 # Current tar file bundle, if any.
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001375 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001376 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001377 for relpath, issymlink in isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001378 root,
1379 u'.' + os.path.sep,
1380 blacklist,
1381 follow_symlinks=(sys.platform != 'win32')):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001382
1383 filepath = os.path.join(root, relpath)
1384 if issymlink:
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001385 # TODO(maruel): Do not call this.
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001386 meta = isolated_format.file_to_metadata(filepath, 0, False)
1387 yield None, relpath, meta
1388 continue
1389
1390 prio = relpath.endswith('.isolated')
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001391 if bundle.try_add(FileItem(path=filepath, algo=algo, high_priority=prio)):
1392 # The file was added to the current pending tarball and won't be archived
1393 # individually.
1394 continue
1395
1396 # Flush and reset the bundle.
1397 for i, p, m in bundle.yield_item_path_meta():
1398 yield i, p, m
1399 bundle = TarBundle(root, algo)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001400
1401 # Yield the file individually.
1402 item = FileItem(path=filepath, algo=algo, size=None, high_priority=prio)
1403 yield item, relpath, item.meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001404
Marc-Antoine Ruel440eee62018-12-04 22:37:05 +00001405 for i, p, m in bundle.yield_item_path_meta():
1406 yield i, p, m
1407
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001408
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001409def _print_upload_stats(items, missing):
1410 """Prints upload stats."""
1411 total = len(items)
1412 total_size = sum(f.size for f in items)
1413 logging.info(
1414 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1415 cache_hit = set(items).difference(missing)
1416 cache_hit_size = sum(f.size for f in cache_hit)
1417 logging.info(
1418 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1419 len(cache_hit),
1420 cache_hit_size / 1024.,
1421 len(cache_hit) * 100. / total,
1422 cache_hit_size * 100. / total_size if total_size else 0)
1423 cache_miss = missing
1424 cache_miss_size = sum(f.size for f in cache_miss)
1425 logging.info(
1426 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1427 len(cache_miss),
1428 cache_miss_size / 1024.,
1429 len(cache_miss) * 100. / total,
1430 cache_miss_size * 100. / total_size if total_size else 0)
1431
1432
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001433def _enqueue_dir(dirpath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001434 """Called by archive_files_to_storage for a directory.
1435
1436 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001437
1438 Yields:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001439 FileItem for every file found, plus one for the .isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001440 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001441 files = {}
1442 for item, relpath, meta in _directory_to_metadata(
1443 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001444 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001445 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001446 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001447 yield item
1448
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001449 # TODO(maruel): If there' not file, don't yield an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001450 data = {
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001451 'algo': hash_algo_name,
1452 'files': files,
1453 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001454 }
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001455 # Keep the file in memory. This is fine because .isolated files are relatively
1456 # small.
1457 yield BufferItem(
1458 tools.format_json(data, True), algo=hash_algo, high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001459
1460
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001461def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001462 """Stores every entry into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001463
1464 Arguments:
1465 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001466 files: iterable of files to upload. If a directory is specified (with a
1467 trailing slash), a .isolated file is created and its hash is returned.
1468 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001469 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001470
1471 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001472 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001473 The first file in the first item is always the .isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001474 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001475 # Dict of path to hash.
1476 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001477 hash_algo = storage.server_ref.hash_algo
1478 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001479 # Generator of FileItem to pass to upload_items() concurrent operation.
1480 channel = threading_utils.TaskChannel()
1481 uploaded_digests = set()
1482 def _upload_items():
1483 results = storage.upload_items(channel)
1484 uploaded_digests.update(f.digest for f in results)
1485 t = threading.Thread(target=_upload_items)
1486 t.start()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001487
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001488 # Keep track locally of the items to determine cold and hot items.
1489 items_found = []
1490 try:
1491 for f in files:
1492 assert isinstance(f, unicode), repr(f)
1493 if f in results:
1494 # Duplicate
1495 continue
1496 try:
1497 filepath = os.path.abspath(f)
1498 if fs.isdir(filepath):
1499 # Uploading a whole directory.
1500 item = None
1501 for item in _enqueue_dir(
1502 filepath, blacklist, hash_algo, hash_algo_name):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001503 channel.send_result(item)
1504 items_found.append(item)
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001505 # The very last item will be the .isolated file.
1506 if not item:
1507 # There was no file in the directory.
1508 continue
1509 elif fs.isfile(filepath):
1510 item = FileItem(
1511 path=filepath,
1512 algo=hash_algo,
1513 size=None,
1514 high_priority=f.endswith('.isolated'))
1515 channel.send_result(item)
1516 items_found.append(item)
1517 else:
1518 raise Error('%s is neither a file or directory.' % f)
1519 results[f] = item.digest
1520 except OSError:
1521 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001522 finally:
Marc-Antoine Ruel1b2885d2018-12-04 18:30:33 +00001523 # Stops the generator, so _upload_items() can exit.
1524 channel.send_done()
1525 t.join()
1526
1527 cold = []
1528 hot = []
1529 for i in items_found:
1530 # Note that multiple FileItem may have the same .digest.
1531 if i.digest in uploaded_digests:
1532 cold.append(i)
1533 else:
1534 hot.append(i)
1535 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001536
1537
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001538@subcommand.usage('<file1..fileN> or - to read from stdin')
1539def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001540 """Archives data to the server.
1541
1542 If a directory is specified, a .isolated file is created the whole directory
1543 is uploaded. Then this .isolated file can be included in another one to run
1544 commands.
1545
1546 The commands output each file that was processed with its content hash. For
1547 directories, the .isolated generated for the directory is listed as the
1548 directory entry itself.
1549 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001550 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001551 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001552 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001553 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001554 server_ref = isolate_storage.ServerRef(
1555 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001556 if files == ['-']:
1557 files = (l.rstrip('\n\r') for l in sys.stdin)
1558 if not files:
1559 parser.error('Nothing to upload')
1560 files = (f.decode('utf-8') for f in files)
1561 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001562 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001563 with get_storage(server_ref) as storage:
1564 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001565 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001566 parser.error(e.args[0])
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001567 print('\n'.join('%s %s' % (h, f) for f, h in results.iteritems()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001568 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001569
1570
1571def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001572 """Download data from the server.
1573
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001574 It can either download individual files or a complete tree from a .isolated
1575 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001576 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001577 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001578 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001579 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001580 help='hash of an isolated file, .isolated file content is discarded, use '
1581 '--file if you need it')
1582 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001583 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1584 help='hash and destination of a file, can be used multiple times')
1585 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001586 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001587 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001588 parser.add_option(
1589 '--use-symlinks', action='store_true',
1590 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001591 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001592 options, args = parser.parse_args(args)
1593 if args:
1594 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001595 if not file_path.enable_symlink():
Marc-Antoine Ruel5a024272019-01-15 20:11:16 +00001596 logging.warning('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001597
nodir55be77b2016-05-03 09:39:57 -07001598 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001599 if bool(options.isolated) == bool(options.file):
1600 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001601 if not options.cache and options.use_symlinks:
1602 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001603
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001604 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001605 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001606 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001607 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001608 if (fs.isfile(options.target) or
1609 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001610 parser.error(
1611 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001612 server_ref = isolate_storage.ServerRef(
1613 options.isolate_server, options.namespace)
1614 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001615 # Fetching individual files.
1616 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001617 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001618 channel = threading_utils.TaskChannel()
1619 pending = {}
1620 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001621 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001622 pending[digest] = dest
1623 storage.async_fetch(
1624 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001625 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001626 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001627 local_caching.UNKNOWN_FILE_SIZE,
1628 functools.partial(
1629 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001630 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001631 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001632 dest = pending.pop(fetched)
1633 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001634
Vadim Shtayura3172be52013-12-03 12:49:05 -08001635 # Fetching whole isolated tree.
1636 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001637 bundle = fetch_isolated(
1638 isolated_hash=options.isolated,
1639 storage=storage,
1640 cache=cache,
1641 outdir=options.target,
1642 use_symlinks=options.use_symlinks)
1643 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001644 if bundle.command:
1645 rel = os.path.join(options.target, bundle.relative_cwd)
1646 print('To run this test please run from the directory %s:' %
1647 os.path.join(options.target, rel))
1648 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001649
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001650 return 0
1651
1652
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001653def add_archive_options(parser):
1654 parser.add_option(
1655 '--blacklist',
1656 action='append', default=list(DEFAULT_BLACKLIST),
1657 help='List of regexp to use as blacklist filter when uploading '
1658 'directories')
1659
1660
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001661def add_isolate_server_options(parser):
1662 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001663 parser.add_option(
1664 '-I', '--isolate-server',
1665 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001666 help='URL of the Isolate Server to use. Defaults to the environment '
1667 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1668 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001669 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001670 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001671 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001672 '--namespace', default='default-gzip',
1673 help='The namespace to use on the Isolate Server, default: %default')
1674
1675
nodir55be77b2016-05-03 09:39:57 -07001676def process_isolate_server_options(
1677 parser, options, set_exception_handler, required):
1678 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001679
1680 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001681 """
1682 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001683 if required:
1684 parser.error('--isolate-server is required.')
1685 return
1686
aludwind7b7b7e2017-06-29 16:38:50 -07001687 if options.grpc_proxy:
1688 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001689 else:
1690 try:
1691 options.isolate_server = net.fix_url(options.isolate_server)
1692 except ValueError as e:
1693 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001694 if set_exception_handler:
1695 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001696 try:
1697 return auth.ensure_logged_in(options.isolate_server)
1698 except ValueError as e:
1699 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001700
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001701
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001702def add_cache_options(parser):
1703 cache_group = optparse.OptionGroup(parser, 'Cache management')
1704 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001705 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001706 help='Directory to keep a local cache of the files. Accelerates download '
1707 'by reusing already downloaded files. Default=%default')
1708 cache_group.add_option(
1709 '--max-cache-size',
1710 type='int',
1711 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001712 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001713 help='Trim if the cache gets larger than this value, default=%default')
1714 cache_group.add_option(
1715 '--min-free-space',
1716 type='int',
1717 metavar='NNN',
1718 default=2*1024*1024*1024,
1719 help='Trim if disk free space becomes lower than this value, '
1720 'default=%default')
1721 cache_group.add_option(
1722 '--max-items',
1723 type='int',
1724 metavar='NNN',
1725 default=100000,
1726 help='Trim if more than this number of items are in the cache '
1727 'default=%default')
1728 parser.add_option_group(cache_group)
1729
1730
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001731def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001732 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001733 policies = local_caching.CachePolicies(
1734 options.max_cache_size,
1735 options.min_free_space,
1736 options.max_items,
1737 # 3 weeks.
1738 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001739
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001740 # |options.cache| path may not exist until DiskContentAddressedCache()
1741 # instance is created.
1742 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel79d42192019-02-06 19:24:16 +00001743 unicode(os.path.abspath(options.cache)), policies, trim, **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001744 else:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001745 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001746
1747
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001748class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001749 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001750 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001751 self,
1752 version=__version__,
1753 prog=os.path.basename(sys.modules[__name__].__file__),
1754 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001755 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001756
1757 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001758 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001759 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001760 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001761 return options, args
1762
1763
1764def main(args):
1765 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001766 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001767
1768
1769if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001770 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001771 fix_encoding.fix_encoding()
1772 tools.disable_buffering()
1773 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001774 sys.exit(main(sys.argv[1:]))