blob: 326b6d1871c6a349242be740b0d88205b4fc874b [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00008__version__ = '0.8.6'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000016import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040017import signal
tansell9e04a8d2016-07-28 09:31:59 -070018import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000019import sys
tansell26de79e2016-11-13 18:41:11 -080020import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050021import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000022import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000023import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000025from third_party import colorama
26from third_party.depot_tools import fix_encoding
27from third_party.depot_tools import subcommand
28
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050029from utils import file_path
maruel12e30012015-10-09 11:55:35 -070030from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040031from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000032from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040033from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070034from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000035from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000036from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000037
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080038import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040039import isolated_format
aludwin81178302016-11-30 17:18:49 -080040import isolate_storage
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -040041import local_caching
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080042
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000043
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000044# Version of isolate protocol passed to the server in /handshake request.
45ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000046
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000047
Vadim Shtayura3148e072014-09-02 18:51:52 -070048# Maximum expected delay (in seconds) between successive file fetches or uploads
49# in Storage. If it takes longer than that, a deadlock might be happening
50# and all stack frames for all threads are dumped to log.
51DEADLOCK_TIMEOUT = 5 * 60
52
53
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000054# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000055# All files are sorted by likelihood of a change in the file content
56# (currently file size is used to estimate this: larger the file -> larger the
57# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000058# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000059# and so on. Numbers here is a trade-off; the more per request, the lower the
60# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
61# larger values cause longer lookups, increasing the initial latency to start
62# uploading, which is especially an issue for large files. This value is
63# optimized for the "few thousands files to look up with minimal number of large
64# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040065ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000066
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000067
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000068# A list of already compressed extension types that should not receive any
69# compression before being uploaded.
70ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040071 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
72 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000073]
74
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000075
maruel@chromium.org41601642013-09-18 19:40:46 +000076# The delay (in seconds) to wait between logging statements when retrieving
77# the required files. This is intended to let the user (or buildbot) know that
78# the program is still running.
79DELAY_BETWEEN_UPDATES_IN_SECS = 30
80
81
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050082DEFAULT_BLACKLIST = (
83 # Temporary vim or python files.
84 r'^.+\.(?:pyc|swp)$',
85 # .git or .svn directory.
86 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
87)
88
89
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050090class Error(Exception):
91 """Generic runtime error."""
92 pass
93
94
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040095class Aborted(Error):
96 """Operation aborted."""
97 pass
98
99
nodir90bc8dc2016-06-15 13:35:21 -0700100class AlreadyExists(Error):
101 """File already exists."""
102
103
maruel12e30012015-10-09 11:55:35 -0700104def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800105 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700106 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800107 if offset:
108 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000109 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000110 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000111 if not data:
112 break
113 yield data
114
115
tansell9e04a8d2016-07-28 09:31:59 -0700116def fileobj_path(fileobj):
117 """Return file system path for file like object or None.
118
119 The returned path is guaranteed to exist and can be passed to file system
120 operations like copy.
121 """
122 name = getattr(fileobj, 'name', None)
123 if name is None:
124 return
125
126 # If the file like object was created using something like open("test.txt")
127 # name will end up being a str (such as a function outside our control, like
128 # the standard library). We want all our paths to be unicode objects, so we
129 # decode it.
130 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500131 # We incorrectly assume that UTF-8 is used everywhere.
132 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700133
tansell26de79e2016-11-13 18:41:11 -0800134 # fs.exists requires an absolute path, otherwise it will fail with an
135 # assertion error.
136 if not os.path.isabs(name):
137 return
138
tansell9e04a8d2016-07-28 09:31:59 -0700139 if fs.exists(name):
140 return name
141
142
143# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
144# wrappers have been created.
145def fileobj_copy(
146 dstfileobj, srcfileobj, size=-1,
147 chunk_size=isolated_format.DISK_FILE_CHUNK):
148 """Copy data from srcfileobj to dstfileobj.
149
150 Providing size means exactly that amount of data will be copied (if there
151 isn't enough data, an IOError exception is thrown). Otherwise all data until
152 the EOF marker will be copied.
153 """
154 if size == -1 and hasattr(srcfileobj, 'tell'):
155 if srcfileobj.tell() != 0:
156 raise IOError('partial file but not using size')
157
158 written = 0
159 while written != size:
160 readsize = chunk_size
161 if size > 0:
162 readsize = min(readsize, size-written)
163 data = srcfileobj.read(readsize)
164 if not data:
165 if size == -1:
166 break
167 raise IOError('partial file, got %s, wanted %s' % (written, size))
168 dstfileobj.write(data)
169 written += len(data)
170
171
172def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
173 """Put srcfileobj at the given dstpath with given mode.
174
175 The function aims to do this as efficiently as possible while still allowing
176 any possible file like object be given.
177
178 Creating a tree of hardlinks has a few drawbacks:
179 - tmpfs cannot be used for the scratch space. The tree has to be on the same
180 partition as the cache.
181 - involves a write to the inode, which advances ctime, cause a metadata
182 writeback (causing disk seeking).
183 - cache ctime cannot be used to detect modifications / corruption.
184 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
185 partition. This is why the function automatically fallbacks to copying the
186 file content.
187 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
188 same owner is for all hardlinks.
189 - Anecdotal report that ext2 is known to be potentially faulty on high rate
190 of hardlink creation.
191
192 Creating a tree of symlinks has a few drawbacks:
193 - Tasks running the equivalent of os.path.realpath() will get the naked path
194 and may fail.
195 - Windows:
196 - Symlinks are reparse points:
197 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
198 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
199 - Symbolic links are Win32 paths, not NT paths.
200 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
201 - Symbolic links are supported on Windows 7 and later only.
202 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
203 default.
204 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
205 RID is present in the token;
206 https://msdn.microsoft.com/en-us/library/bb530410.aspx
207 """
208 srcpath = fileobj_path(srcfileobj)
209 if srcpath and size == -1:
210 readonly = file_mode is None or (
211 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
212
213 if readonly:
214 # If the file is read only we can link the file
215 if use_symlink:
216 link_mode = file_path.SYMLINK_WITH_FALLBACK
217 else:
218 link_mode = file_path.HARDLINK_WITH_FALLBACK
219 else:
220 # If not read only, we must copy the file
221 link_mode = file_path.COPY
222
223 file_path.link_file(dstpath, srcpath, link_mode)
224 else:
225 # Need to write out the file
226 with fs.open(dstpath, 'wb') as dstfileobj:
227 fileobj_copy(dstfileobj, srcfileobj, size)
228
229 assert fs.exists(dstpath)
230
231 # file_mode of 0 is actually valid, so need explicit check.
232 if file_mode is not None:
233 fs.chmod(dstpath, file_mode)
234
235
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000236def zip_compress(content_generator, level=7):
237 """Reads chunks from |content_generator| and yields zip compressed chunks."""
238 compressor = zlib.compressobj(level)
239 for chunk in content_generator:
240 compressed = compressor.compress(chunk)
241 if compressed:
242 yield compressed
243 tail = compressor.flush(zlib.Z_FINISH)
244 if tail:
245 yield tail
246
247
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400248def zip_decompress(
249 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000250 """Reads zipped data from |content_generator| and yields decompressed data.
251
252 Decompresses data in small chunks (no larger than |chunk_size|) so that
253 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
254
255 Raises IOError if data is corrupted or incomplete.
256 """
257 decompressor = zlib.decompressobj()
258 compressed_size = 0
259 try:
260 for chunk in content_generator:
261 compressed_size += len(chunk)
262 data = decompressor.decompress(chunk, chunk_size)
263 if data:
264 yield data
265 while decompressor.unconsumed_tail:
266 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
267 if data:
268 yield data
269 tail = decompressor.flush()
270 if tail:
271 yield tail
272 except zlib.error as e:
273 raise IOError(
274 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
275 # Ensure all data was read and decompressed.
276 if decompressor.unused_data or decompressor.unconsumed_tail:
277 raise IOError('Not all data was decompressed')
278
279
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000280def get_zip_compression_level(filename):
281 """Given a filename calculates the ideal zip compression level to use."""
282 file_ext = os.path.splitext(filename)[1].lower()
283 # TODO(csharp): Profile to find what compression level works best.
284 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
285
286
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000287def create_directories(base_directory, files):
288 """Creates the directory structure needed by the given list of files."""
289 logging.debug('create_directories(%s, %d)', base_directory, len(files))
290 # Creates the tree of directories to create.
291 directories = set(os.path.dirname(f) for f in files)
292 for item in list(directories):
293 while item:
294 directories.add(item)
295 item = os.path.dirname(item)
296 for d in sorted(directories):
297 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700298 abs_d = os.path.join(base_directory, d)
299 if not fs.isdir(abs_d):
300 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000301
302
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500303def create_symlinks(base_directory, files):
304 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000305 for filepath, properties in files:
306 if 'l' not in properties:
307 continue
308 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500309 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000310 logging.warning('Ignoring symlink %s', filepath)
311 continue
312 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700313 try:
314 os.symlink(properties['l'], outfile) # pylint: disable=E1101
315 except OSError as e:
316 if e.errno == errno.EEXIST:
317 raise AlreadyExists('File %s already exists.' % outfile)
318 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000319
320
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400321class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800322 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000323
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 Its digest and size may be provided in advance, if known. Otherwise they will
325 be derived from the file content.
326 """
327
328 def __init__(self, path, digest=None, size=None, high_priority=False):
329 super(FileItem, self).__init__(
330 digest,
maruel12e30012015-10-09 11:55:35 -0700331 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800332 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000333 self.path = path
334 self.compression_level = get_zip_compression_level(path)
335
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800336 def content(self):
337 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000338
339
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400340class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000341 """A byte buffer to push to Storage."""
342
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800343 def __init__(self, buf, high_priority=False):
344 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000345 self.buffer = buf
346
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800347 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000348 return [self.buffer]
349
350
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000351class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800352 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000353
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800354 Implements compression support, parallel 'contains' checks, parallel uploads
355 and more.
356
357 Works only within single namespace (and thus hashing algorithm and compression
358 scheme are fixed).
359
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400360 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
361 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800362 """
363
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700364 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000365 self._storage_api = storage_api
366 self._cpu_thread_pool = None
367 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400368 self._aborted = False
369 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000370
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000371 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000372 def server_ref(self):
373 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700374
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000375 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700376 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000377 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700378
379 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000380 def cpu_thread_pool(self):
381 """ThreadPool for CPU-bound tasks like zipping."""
382 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500383 threads = max(threading_utils.num_processors(), 2)
384 if sys.maxsize <= 2L**32:
385 # On 32 bits userland, do not try to use more than 16 threads.
386 threads = min(threads, 16)
387 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000388 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000389
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000390 @property
391 def net_thread_pool(self):
392 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
393 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700394 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000395 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000396
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397 def close(self):
398 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400399 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000400 if self._cpu_thread_pool:
401 self._cpu_thread_pool.join()
402 self._cpu_thread_pool.close()
403 self._cpu_thread_pool = None
404 if self._net_thread_pool:
405 self._net_thread_pool.join()
406 self._net_thread_pool.close()
407 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400408 logging.info('Done.')
409
410 def abort(self):
411 """Cancels any pending or future operations."""
412 # This is not strictly theadsafe, but in the worst case the logging message
413 # will be printed twice. Not a big deal. In other places it is assumed that
414 # unprotected reads and writes to _aborted are serializable (it is true
415 # for python) and thus no locking is used.
416 if not self._aborted:
417 logging.warning('Aborting... It can take a while.')
418 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000419
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000420 def __enter__(self):
421 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400422 assert not self._prev_sig_handlers, self._prev_sig_handlers
423 for s in (signal.SIGINT, signal.SIGTERM):
424 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000425 return self
426
427 def __exit__(self, _exc_type, _exc_value, _traceback):
428 """Context manager interface."""
429 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400430 while self._prev_sig_handlers:
431 s, h = self._prev_sig_handlers.popitem()
432 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000433 return False
434
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000435 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800436 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000437
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800438 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000439
440 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400441 items: list of isolate_storage.Item instances that represents data to
442 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000443
444 Returns:
445 List of items that were uploaded. All other items are already there.
446 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +0000447 # TODO(maruel): Stop serializing the list.
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700448 logging.info('upload_items(items=%d)', len(items))
449
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800450 # Ensure all digests are calculated.
451 for item in items:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000452 item.prepare(self.server_ref.hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800453
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400454 # For each digest keep only first isolate_storage.Item that matches it. All
455 # other items are just indistinguishable copies from the point of view of
456 # isolate server (it doesn't care about paths at all, only content and
457 # digests).
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000458 seen = {}
459 duplicates = 0
460 for item in items:
461 if seen.setdefault(item.digest, item) is not item:
462 duplicates += 1
463 items = seen.values()
464 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700465 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000466
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000467 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000468 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000469 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800470 channel = threading_utils.TaskChannel()
471 for missing_item, push_state in self.get_missing_items(items):
472 missing.add(missing_item)
473 self.async_push(channel, missing_item, push_state)
474
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000475 # No need to spawn deadlock detector thread if there's nothing to upload.
476 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700477 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000478 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000479 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000480 detector.ping()
481 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000482 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000483 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000484 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000485 logging.info('All files are uploaded')
486
487 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000488 total = len(items)
489 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000490 logging.info(
491 'Total: %6d, %9.1fkb',
492 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000493 total_size / 1024.)
494 cache_hit = set(items) - missing
495 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000496 logging.info(
497 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
498 len(cache_hit),
499 cache_hit_size / 1024.,
500 len(cache_hit) * 100. / total,
501 cache_hit_size * 100. / total_size if total_size else 0)
502 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000503 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000504 logging.info(
505 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
506 len(cache_miss),
507 cache_miss_size / 1024.,
508 len(cache_miss) * 100. / total,
509 cache_miss_size * 100. / total_size if total_size else 0)
510
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000511 return uploaded
512
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800513 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000514 """Starts asynchronous push to the server in a parallel thread.
515
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800516 Can be used only after |item| was checked for presence on a server with
517 'get_missing_items' call. 'get_missing_items' returns |push_state| object
518 that contains storage specific information describing how to upload
519 the item (for example in case of cloud storage, it is signed upload URLs).
520
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000521 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000522 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400523 item: item to upload as instance of isolate_storage.Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800524 push_state: push state returned by 'get_missing_items' call for |item|.
525
526 Returns:
527 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800529 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400530 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700531 threading_utils.PRIORITY_HIGH if item.high_priority
532 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800533
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000534 def push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400535 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400536 if self._aborted:
537 raise Aborted()
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000538 item.prepare(self.server_ref.hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800539 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000540 return item
541
Wei Huang1a38fbe2017-11-28 22:55:22 -0500542 # If zipping is not required, just start a push task. Don't pass 'content'
543 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000544 if not self.server_ref.is_with_compression:
Wei Huang1a38fbe2017-11-28 22:55:22 -0500545 self.net_thread_pool.add_task_with_channel(channel, priority, push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000546 return
547
548 # If zipping is enabled, zip in a separate thread.
549 def zip_and_push():
550 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
551 # content right here. It will block until all file is zipped.
552 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400553 if self._aborted:
554 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800555 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000556 data = ''.join(stream)
557 except Exception as exc:
558 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800559 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000560 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500561 # Pass '[data]' explicitly because the compressed data is not same as the
562 # one provided by 'item'. Since '[data]' is a list, it can safely be
563 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000564 self.net_thread_pool.add_task_with_channel(
565 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000566 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000567
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800568 def push(self, item, push_state):
569 """Synchronously pushes a single item to the server.
570
571 If you need to push many items at once, consider using 'upload_items' or
572 'async_push' with instance of TaskChannel.
573
574 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400575 item: item to upload as instance of isolate_storage.Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800576 push_state: push state returned by 'get_missing_items' call for |item|.
577
578 Returns:
579 Pushed item (same object as |item|).
580 """
581 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700582 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800583 self.async_push(channel, item, push_state)
584 pushed = channel.pull()
585 assert pushed is item
586 return item
587
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000588 def async_fetch(self, channel, priority, digest, size, sink):
589 """Starts asynchronous fetch from the server in a parallel thread.
590
591 Arguments:
592 channel: TaskChannel that receives back |digest| when download ends.
593 priority: thread pool task priority for the fetch.
594 digest: hex digest of an item to download.
595 size: expected size of the item (after decompression).
596 sink: function that will be called as sink(generator).
597 """
598 def fetch():
599 try:
600 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400601 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000602 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400603 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000604 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000605 verifier = FetchStreamVerifier(
606 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000607 # Verified stream goes to |sink|.
608 sink(verifier.run())
609 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800610 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000611 raise
612 return digest
613
614 # Don't bother with zip_thread_pool for decompression. Decompression is
615 # really fast and most probably IO bound anyway.
616 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
617
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000618 def get_missing_items(self, items):
619 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000621 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000622
623 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400624 items: a list of isolate_storage.Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000625
626 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 For each missing item it yields a pair (item, push_state), where:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400628 * item - isolate_storage.Item object that is missing (one of |items|).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800629 * push_state - opaque object that contains storage specific information
630 describing how to upload the item (for example in case of cloud
631 storage, it is signed upload URLs). It can later be passed to
632 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000633 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000634 channel = threading_utils.TaskChannel()
635 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800636
637 # Ensure all digests are calculated.
638 for item in items:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000639 item.prepare(self.server_ref.hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800640
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400641 def contains(batch):
642 if self._aborted:
643 raise Aborted()
644 return self._storage_api.contains(batch)
645
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000646 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800647 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400648 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400649 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000650 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800651
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000652 # Yield results as they come in.
653 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800654 for missing_item, push_state in channel.pull().iteritems():
655 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000656
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000657
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800658def batch_items_for_check(items):
659 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000660
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800661 Each batch corresponds to a single 'exists?' query to the server via a call
662 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000663
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800664 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400665 items: a list of isolate_storage.Item objects.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800666
667 Yields:
668 Batches of items to query for existence in a single operation,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400669 each batch is a list of isolate_storage.Item objects.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800670 """
671 batch_count = 0
672 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
673 next_queries = []
674 for item in sorted(items, key=lambda x: x.size, reverse=True):
675 next_queries.append(item)
676 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000677 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800678 next_queries = []
679 batch_count += 1
680 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
681 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
682 if next_queries:
683 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000684
685
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000686class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400687 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000688
689 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400690 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
691 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000692 """
693
694 def __init__(self, storage, cache):
695 self.storage = storage
696 self.cache = cache
697 self._channel = threading_utils.TaskChannel()
698 self._pending = set()
699 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000700 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400701 # Pending digests that the caller waits for, see wait_on()/wait().
702 self._waiting_on = set()
703 # Already fetched digests the caller waits for which are not yet returned by
704 # wait().
705 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000706
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400707 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700708 self,
709 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400710 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700711 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000712 """Starts asynchronous fetch of item |digest|."""
713 # Fetching it now?
714 if digest in self._pending:
715 return
716
717 # Mark this file as in use, verify_all_cached will later ensure it is still
718 # in cache.
719 self._accessed.add(digest)
720
721 # Already fetched? Notify cache to update item's LRU position.
722 if digest in self._fetched:
723 # 'touch' returns True if item is in cache and not corrupted.
724 if self.cache.touch(digest, size):
725 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000726 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000727 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000728
729 # TODO(maruel): It should look at the free disk space, the current cache
730 # size and the size of the new item on every new item:
731 # - Trim the cache as more entries are listed when free disk space is low,
732 # otherwise if the amount of data downloaded during the run > free disk
733 # space, it'll crash.
734 # - Make sure there's enough free disk space to fit all dependencies of
735 # this run! If not, abort early.
736
737 # Start fetching.
738 self._pending.add(digest)
739 self.storage.async_fetch(
740 self._channel, priority, digest, size,
741 functools.partial(self.cache.write, digest))
742
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400743 def wait_on(self, digest):
744 """Updates digests to be waited on by 'wait'."""
745 # Calculate once the already fetched items. These will be retrieved first.
746 if digest in self._fetched:
747 self._waiting_on_ready.add(digest)
748 else:
749 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000750
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400751 def wait(self):
752 """Waits until any of waited-on items is retrieved.
753
754 Once this happens, it is remove from the waited-on set and returned.
755
756 This function is called in two waves. The first wave it is done for HIGH
757 priority items, the isolated files themselves. The second wave it is called
758 for all the files.
759
760 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000761 """
762 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400763 if self._waiting_on_ready:
764 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000765
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400766 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000767
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400768 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000769 while self._pending:
770 digest = self._channel.pull()
771 self._pending.remove(digest)
772 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400773 if digest in self._waiting_on:
774 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000775 return digest
776
777 # Should never reach this point due to assert above.
778 raise RuntimeError('Impossible state')
779
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400780 @property
781 def wait_queue_empty(self):
782 """Returns True if there is no digest left for wait() to return."""
783 return not self._waiting_on and not self._waiting_on_ready
784
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000785 def inject_local_file(self, path, algo):
786 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700787 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000788 data = f.read()
789 digest = algo(data).hexdigest()
790 self.cache.write(digest, [data])
791 self._fetched.add(digest)
792 return digest
793
794 @property
795 def pending_count(self):
796 """Returns number of items to be fetched."""
797 return len(self._pending)
798
799 def verify_all_cached(self):
800 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000801 # Not thread safe, but called after all work is done.
802 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000803
804
805class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400806 """Verifies that fetched file is valid before passing it to the
807 ContentAddressedCache.
808 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000809
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400810 def __init__(self, stream, hasher, expected_digest, expected_size):
811 """Initializes the verifier.
812
813 Arguments:
814 * stream: an iterable yielding chunks of content
815 * hasher: an object from hashlib that supports update() and hexdigest()
816 (eg, hashlib.sha1).
817 * expected_digest: if the entire stream is piped through hasher and then
818 summarized via hexdigest(), this should be the result. That is, it
819 should be a hex string like 'abc123'.
820 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400821 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400822 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400823 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000824 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400825 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000826 self.expected_size = expected_size
827 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400828 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000829
830 def run(self):
831 """Generator that yields same items as |stream|.
832
833 Verifies |stream| is complete before yielding a last chunk to consumer.
834
835 Also wraps IOError produced by consumer into MappingError exceptions since
836 otherwise Storage will retry fetch on unrelated local cache errors.
837 """
838 # Read one chunk ahead, keep it in |stored|.
839 # That way a complete stream can be verified before pushing last chunk
840 # to consumer.
841 stored = None
842 for chunk in self.stream:
843 assert chunk is not None
844 if stored is not None:
845 self._inspect_chunk(stored, is_last=False)
846 try:
847 yield stored
848 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400849 raise isolated_format.MappingError(
850 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000851 stored = chunk
852 if stored is not None:
853 self._inspect_chunk(stored, is_last=True)
854 try:
855 yield stored
856 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400857 raise isolated_format.MappingError(
858 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000859
860 def _inspect_chunk(self, chunk, is_last):
861 """Called for each fetched chunk before passing it to consumer."""
862 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400863 self.rolling_hash.update(chunk)
864 if not is_last:
865 return
866
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400867 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000868 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400869 msg = 'Incorrect file size: want %d, got %d' % (
870 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400871 raise IOError(msg)
872
873 actual_digest = self.rolling_hash.hexdigest()
874 if self.expected_digest != actual_digest:
875 msg = 'Incorrect digest: want %s, got %s' % (
876 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400877 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000878
879
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700880class IsolatedBundle(object):
881 """Fetched and parsed .isolated file with all dependencies."""
882
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000883 def __init__(self, filter_cb):
884 """
885 filter_cb: callback function to filter downloaded content.
886 When filter_cb is not None, Isolated file is downloaded iff
887 filter_cb(filepath) returns True.
888 """
889
Vadim Shtayura3148e072014-09-02 18:51:52 -0700890 self.command = []
891 self.files = {}
892 self.read_only = None
893 self.relative_cwd = None
894 # The main .isolated file, a IsolatedFile instance.
895 self.root = None
896
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000897 self._filter_cb = filter_cb
898
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700899 def fetch(self, fetch_queue, root_isolated_hash, algo):
900 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700901
902 It enables support for "included" .isolated files. They are processed in
903 strict order but fetched asynchronously from the cache. This is important so
904 that a file in an included .isolated file that is overridden by an embedding
905 .isolated file is not fetched needlessly. The includes are fetched in one
906 pass and the files are fetched as soon as all the ones on the left-side
907 of the tree were fetched.
908
909 The prioritization is very important here for nested .isolated files.
910 'includes' have the highest priority and the algorithm is optimized for both
911 deep and wide trees. A deep one is a long link of .isolated files referenced
912 one at a time by one item in 'includes'. A wide one has a large number of
913 'includes' in a single .isolated file. 'left' is defined as an included
914 .isolated file earlier in the 'includes' list. So the order of the elements
915 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700916
917 As a side effect this method starts asynchronous fetch of all data files
918 by adding them to |fetch_queue|. It doesn't wait for data files to finish
919 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700920 """
921 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
922
923 # Isolated files being retrieved now: hash -> IsolatedFile instance.
924 pending = {}
925 # Set of hashes of already retrieved items to refuse recursive includes.
926 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700927 # Set of IsolatedFile's whose data files have already being fetched.
928 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700929
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700930 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400931 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -0700932 h = isolated_file.obj_hash
933 if h in seen:
934 raise isolated_format.IsolatedError(
935 'IsolatedFile %s is retrieved recursively' % h)
936 assert h not in pending
937 seen.add(h)
938 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400939 # This isolated item is being added dynamically, notify FetchQueue.
940 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700941 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
942
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700943 # Start fetching root *.isolated file (single file, not the whole bundle).
944 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700945
946 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700947 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400948 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700949 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -0700950 with fetch_queue.cache.getfileobj(item_hash) as f:
951 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -0700952
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700953 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700954 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700955 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700956
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700957 # Always fetch *.isolated files in traversal order, waiting if necessary
958 # until next to-be-processed node loads. "Waiting" is done by yielding
959 # back to the outer loop, that waits until some *.isolated is loaded.
960 for node in isolated_format.walk_includes(self.root):
961 if node not in processed:
962 # Not visited, and not yet loaded -> wait for it to load.
963 if not node.is_loaded:
964 break
965 # Not visited and loaded -> process it and continue the traversal.
966 self._start_fetching_files(node, fetch_queue)
967 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700968
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700969 # All *.isolated files should be processed by now and only them.
970 all_isolateds = set(isolated_format.walk_includes(self.root))
971 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400972 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -0700973
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700974 # Extract 'command' and other bundle properties.
975 for node in isolated_format.walk_includes(self.root):
976 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700977 self.relative_cwd = self.relative_cwd or ''
978
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700979 def _start_fetching_files(self, isolated, fetch_queue):
980 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700981
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700982 Modifies self.files.
983 """
maruel10bea7b2016-12-07 05:03:49 -0800984 files = isolated.data.get('files', {})
985 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
986 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000987 if self._filter_cb and not self._filter_cb(filepath):
988 continue
989
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700990 # Root isolated has priority on the files being mapped. In particular,
991 # overridden files must not be fetched.
992 if filepath not in self.files:
993 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -0700994
995 # Make sure if the isolated is read only, the mode doesn't have write
996 # bits.
997 if 'm' in properties and self.read_only:
998 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
999
1000 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001001 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001002 fetch_queue.add(
1003 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1004
1005 def _update_self(self, node):
1006 """Extracts bundle global parameters from loaded *.isolated file.
1007
1008 Will be called with each loaded *.isolated file in order of traversal of
1009 isolated include graph (see isolated_format.walk_includes).
1010 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001011 # Grabs properties.
1012 if not self.command and node.data.get('command'):
1013 # Ensure paths are correctly separated on windows.
1014 self.command = node.data['command']
1015 if self.command:
1016 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001017 if self.read_only is None and node.data.get('read_only') is not None:
1018 self.read_only = node.data['read_only']
1019 if (self.relative_cwd is None and
1020 node.data.get('relative_cwd') is not None):
1021 self.relative_cwd = node.data['relative_cwd']
1022
1023
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001024def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001025 """Returns Storage class that can upload and download from |namespace|.
1026
1027 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001028 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001029
1030 Returns:
1031 Instance of Storage.
1032 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001033 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1034 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001035
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001036
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001037def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1038 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001039 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001040
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001041 Arguments:
1042 isolated_hash: hash of the root *.isolated file.
1043 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001044 cache: ContentAddressedCache class that knows how to store and map files
1045 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001046 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001047 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001048 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001049
1050 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001051 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001052 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001053 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001054 'fetch_isolated(%s, %s, %s, %s, %s)',
1055 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001056 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001057 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001058 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001059 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001060
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001061 with tools.Profiler('GetIsolateds'):
1062 # Optionally support local files by manually adding them to cache.
1063 if not isolated_format.is_valid_hash(isolated_hash, algo):
1064 logging.debug('%s is not a valid hash, assuming a file '
1065 '(algo was %s, hash size was %d)',
1066 isolated_hash, algo(), algo().digest_size)
1067 path = unicode(os.path.abspath(isolated_hash))
1068 try:
1069 isolated_hash = fetch_queue.inject_local_file(path, algo)
1070 except IOError as e:
1071 raise isolated_format.MappingError(
1072 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1073 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001074
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001075 # Load all *.isolated and start loading rest of the files.
1076 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001077
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001078 with tools.Profiler('GetRest'):
1079 # Create file system hierarchy.
1080 file_path.ensure_tree(outdir)
1081 create_directories(outdir, bundle.files)
1082 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001083
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001084 # Ensure working directory exists.
1085 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1086 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001087
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001088 # Multimap: digest -> list of pairs (path, props).
1089 remaining = {}
1090 for filepath, props in bundle.files.iteritems():
1091 if 'h' in props:
1092 remaining.setdefault(props['h'], []).append((filepath, props))
1093 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001094
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001095 # Now block on the remaining files to be downloaded and mapped.
1096 logging.info('Retrieving remaining files (%d of them)...',
1097 fetch_queue.pending_count)
1098 last_update = time.time()
1099 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1100 while remaining:
1101 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001102
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001103 # Wait for any item to finish fetching to cache.
1104 digest = fetch_queue.wait()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001105
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001106 # Create the files in the destination using item in cache as the
1107 # source.
1108 for filepath, props in remaining.pop(digest):
1109 fullpath = os.path.join(outdir, filepath)
tansell9e04a8d2016-07-28 09:31:59 -07001110
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001111 with cache.getfileobj(digest) as srcfileobj:
1112 filetype = props.get('t', 'basic')
tanselle4288c32016-07-28 09:45:40 -07001113
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001114 if filetype == 'basic':
1115 # Ignore all bits apart from the user.
1116 file_mode = (props.get('m') or 0500) & 0700
1117 if bundle.read_only:
1118 # Enforce read-only if the root bundle does.
1119 file_mode &= 0500
1120 putfile(
1121 srcfileobj, fullpath, file_mode,
1122 use_symlink=use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001123
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001124 elif filetype == 'tar':
1125 basedir = os.path.dirname(fullpath)
1126 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1127 for ti in t:
1128 if not ti.isfile():
1129 logging.warning(
1130 'Path(%r) is nonfile (%s), skipped',
1131 ti.name, ti.type)
1132 continue
1133 # Handle files created on Windows fetched on POSIX and the
1134 # reverse.
1135 other_sep = '/' if os.path.sep == '\\' else '\\'
1136 name = ti.name.replace(other_sep, os.path.sep)
1137 fp = os.path.normpath(os.path.join(basedir, name))
1138 if not fp.startswith(basedir):
1139 logging.error(
1140 'Path(%r) is outside root directory',
1141 fp)
1142 ifd = t.extractfile(ti)
1143 file_path.ensure_tree(os.path.dirname(fp))
1144 file_mode = ti.mode & 0700
1145 if bundle.read_only:
1146 # Enforce read-only if the root bundle does.
1147 file_mode &= 0500
1148 putfile(ifd, fp, file_mode, ti.size)
tansell26de79e2016-11-13 18:41:11 -08001149
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001150 else:
1151 raise isolated_format.IsolatedError(
1152 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001153
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001154 # Report progress.
1155 duration = time.time() - last_update
1156 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1157 msg = '%d files remaining...' % len(remaining)
1158 sys.stdout.write(msg + '\n')
1159 sys.stdout.flush()
1160 logging.info(msg)
1161 last_update = time.time()
1162 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001163
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001164 # Save the cache right away to not loose the state of the new objects.
1165 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001166 # Cache could evict some items we just tried to fetch, it's a fatal error.
1167 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001168 free_disk = file_path.get_free_space(cache.cache_dir)
1169 msg = (
1170 'Cache is too small to hold all requested files.\n'
1171 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001172 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001173 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001174 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001175
1176
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001177def directory_to_metadata(root, algo, blacklist):
1178 """Returns the FileItem list and .isolated metadata for a directory."""
1179 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001180 paths = isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruel7a68f712017-12-01 18:45:18 -05001181 root, u'.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001182 metadata = {
1183 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001184 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001185 for relpath in paths
1186 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001187 for v in metadata.itervalues():
1188 v.pop('t')
1189 items = [
1190 FileItem(
1191 path=os.path.join(root, relpath),
1192 digest=meta['h'],
1193 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001194 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001195 for relpath, meta in metadata.iteritems() if 'h' in meta
1196 ]
1197 return items, metadata
1198
1199
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001200def _enqueue_dir(dirpath, blacklist, tempdir, hash_algo, hash_algo_name):
1201 """Called by archive_files_to_storage for a directory.
1202
1203 Create an .isolated file.
1204 """
1205 items, metadata = directory_to_metadata(dirpath, hash_algo, blacklist)
1206 # TODO(maruel): Stop putting to disk.
1207 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
1208 os.close(handle)
1209 data = {
1210 'algo': hash_algo_name,
1211 'files': metadata,
1212 'version': isolated_format.ISOLATED_FILE_VERSION,
1213 }
1214 isolated_format.save_isolated(isolated, data)
1215 h = isolated_format.hash_file(isolated, hash_algo)
1216 items.append(
1217 FileItem(
1218 path=isolated,
1219 digest=h,
1220 size=fs.stat(isolated).st_size,
1221 high_priority=True))
1222 return items, h
1223
1224
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001225def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001226 """Stores every entries into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001227
1228 Arguments:
1229 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001230 files: iterable of files to upload. If a directory is specified (with a
1231 trailing slash), a .isolated file is created and its hash is returned.
1232 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001233 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001234
1235 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001236 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
maruel064c0a32016-04-05 11:47:15 -07001237 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001238 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001239 # Dict of path to hash.
1240 results = collections.OrderedDict()
1241 # TODO(maruel): Stop needing a temporary directory.
1242 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001243 try:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001244 items_to_upload = []
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001245 hash_algo = storage.server_ref.hash_algo
1246 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001247 for f in files:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001248 assert isinstance(f, unicode), repr(f)
1249 if f in results:
1250 # Duplicate
1251 continue
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001252 try:
1253 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001254 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001255 # Uploading a whole directory.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001256 items, h = _enqueue_dir(
1257 filepath, blacklist, tempdir, hash_algo, hash_algo_name)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001258 items_to_upload.extend(items)
maruel12e30012015-10-09 11:55:35 -07001259 elif fs.isfile(filepath):
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001260 h = isolated_format.hash_file(filepath, hash_algo)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001261 logging.info('- %s: %s', f, h)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001262 items_to_upload.append(
1263 FileItem(
1264 path=filepath,
1265 digest=h,
maruel12e30012015-10-09 11:55:35 -07001266 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001267 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001268 else:
1269 raise Error('%s is neither a file or directory.' % f)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001270 results[f] = h
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001271 except OSError:
1272 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001273
maruel064c0a32016-04-05 11:47:15 -07001274 uploaded = storage.upload_items(items_to_upload)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001275 cold = []
1276 hot = []
1277 uploaded = [f.digest for f in uploaded]
1278 for i in items_to_upload:
1279 if i.digest in uploaded:
1280 cold.append(i)
1281 else:
1282 hot.append(i)
maruel064c0a32016-04-05 11:47:15 -07001283 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001284 finally:
maruel12e30012015-10-09 11:55:35 -07001285 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001286 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001287
1288
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001289@subcommand.usage('<file1..fileN> or - to read from stdin')
1290def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001291 """Archives data to the server.
1292
1293 If a directory is specified, a .isolated file is created the whole directory
1294 is uploaded. Then this .isolated file can be included in another one to run
1295 commands.
1296
1297 The commands output each file that was processed with its content hash. For
1298 directories, the .isolated generated for the directory is listed as the
1299 directory entry itself.
1300 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001301 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001302 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001303 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001304 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001305 server_ref = isolate_storage.ServerRef(
1306 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001307 if files == ['-']:
1308 files = (l.rstrip('\n\r') for l in sys.stdin)
1309 if not files:
1310 parser.error('Nothing to upload')
1311 files = (f.decode('utf-8') for f in files)
1312 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001313 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001314 with get_storage(server_ref) as storage:
1315 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001316 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001317 parser.error(e.args[0])
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001318 print('\n'.join('%s %s' % (h, f) for f, h in results.iteritems()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001319 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001320
1321
1322def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001323 """Download data from the server.
1324
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001325 It can either download individual files or a complete tree from a .isolated
1326 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001327 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001328 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001329 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001330 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001331 help='hash of an isolated file, .isolated file content is discarded, use '
1332 '--file if you need it')
1333 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001334 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1335 help='hash and destination of a file, can be used multiple times')
1336 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001337 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001338 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001339 parser.add_option(
1340 '--use-symlinks', action='store_true',
1341 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001342 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001343 options, args = parser.parse_args(args)
1344 if args:
1345 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001346 if not file_path.enable_symlink():
1347 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001348
nodir55be77b2016-05-03 09:39:57 -07001349 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001350 if bool(options.isolated) == bool(options.file):
1351 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001352 if not options.cache and options.use_symlinks:
1353 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001354
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001355 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001356 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001357 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001358 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001359 if (fs.isfile(options.target) or
1360 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001361 parser.error(
1362 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001363 server_ref = isolate_storage.ServerRef(
1364 options.isolate_server, options.namespace)
1365 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001366 # Fetching individual files.
1367 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001368 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001369 channel = threading_utils.TaskChannel()
1370 pending = {}
1371 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001372 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001373 pending[digest] = dest
1374 storage.async_fetch(
1375 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001376 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001377 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001378 local_caching.UNKNOWN_FILE_SIZE,
1379 functools.partial(
1380 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001381 while pending:
1382 fetched = channel.pull()
1383 dest = pending.pop(fetched)
1384 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001385
Vadim Shtayura3172be52013-12-03 12:49:05 -08001386 # Fetching whole isolated tree.
1387 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001388 bundle = fetch_isolated(
1389 isolated_hash=options.isolated,
1390 storage=storage,
1391 cache=cache,
1392 outdir=options.target,
1393 use_symlinks=options.use_symlinks)
1394 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001395 if bundle.command:
1396 rel = os.path.join(options.target, bundle.relative_cwd)
1397 print('To run this test please run from the directory %s:' %
1398 os.path.join(options.target, rel))
1399 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001400
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001401 return 0
1402
1403
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001404def add_archive_options(parser):
1405 parser.add_option(
1406 '--blacklist',
1407 action='append', default=list(DEFAULT_BLACKLIST),
1408 help='List of regexp to use as blacklist filter when uploading '
1409 'directories')
1410
1411
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001412def add_isolate_server_options(parser):
1413 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001414 parser.add_option(
1415 '-I', '--isolate-server',
1416 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001417 help='URL of the Isolate Server to use. Defaults to the environment '
1418 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1419 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001420 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001421 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001422 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001423 '--namespace', default='default-gzip',
1424 help='The namespace to use on the Isolate Server, default: %default')
1425
1426
nodir55be77b2016-05-03 09:39:57 -07001427def process_isolate_server_options(
1428 parser, options, set_exception_handler, required):
1429 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001430
1431 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001432 """
1433 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001434 if required:
1435 parser.error('--isolate-server is required.')
1436 return
1437
aludwind7b7b7e2017-06-29 16:38:50 -07001438 if options.grpc_proxy:
1439 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001440 else:
1441 try:
1442 options.isolate_server = net.fix_url(options.isolate_server)
1443 except ValueError as e:
1444 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001445 if set_exception_handler:
1446 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001447 try:
1448 return auth.ensure_logged_in(options.isolate_server)
1449 except ValueError as e:
1450 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001451
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001452
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001453def add_cache_options(parser):
1454 cache_group = optparse.OptionGroup(parser, 'Cache management')
1455 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001456 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001457 help='Directory to keep a local cache of the files. Accelerates download '
1458 'by reusing already downloaded files. Default=%default')
1459 cache_group.add_option(
1460 '--max-cache-size',
1461 type='int',
1462 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001463 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001464 help='Trim if the cache gets larger than this value, default=%default')
1465 cache_group.add_option(
1466 '--min-free-space',
1467 type='int',
1468 metavar='NNN',
1469 default=2*1024*1024*1024,
1470 help='Trim if disk free space becomes lower than this value, '
1471 'default=%default')
1472 cache_group.add_option(
1473 '--max-items',
1474 type='int',
1475 metavar='NNN',
1476 default=100000,
1477 help='Trim if more than this number of items are in the cache '
1478 'default=%default')
1479 parser.add_option_group(cache_group)
1480
1481
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001482def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001483 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001484 policies = local_caching.CachePolicies(
1485 options.max_cache_size,
1486 options.min_free_space,
1487 options.max_items,
1488 # 3 weeks.
1489 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001490
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001491 # |options.cache| path may not exist until DiskContentAddressedCache()
1492 # instance is created.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001493 server_ref = isolate_storage.ServerRef(
1494 options.isolate_server, options.namespace)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001495 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001496 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001497 policies,
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001498 server_ref.hash_algo(), # pylint: disable=not-callable
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001499 trim,
maruele6fc9382017-05-04 09:03:48 -07001500 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001501 else:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001502 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001503
1504
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001505class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001506 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001507 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001508 self,
1509 version=__version__,
1510 prog=os.path.basename(sys.modules[__name__].__file__),
1511 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001512 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001513
1514 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001515 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001516 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001517 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001518 return options, args
1519
1520
1521def main(args):
1522 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001523 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001524
1525
1526if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001527 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001528 fix_encoding.fix_encoding()
1529 tools.disable_buffering()
1530 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001531 sys.exit(main(sys.argv[1:]))