blob: aea6f8db2446cc2165b0033594f1ca9dc14c2023 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04008__version__ = '0.8.5'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
tansell9e04a8d2016-07-28 09:31:59 -070017import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
tansell26de79e2016-11-13 18:41:11 -080019import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050020import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
maruel12e30012015-10-09 11:55:35 -070029from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040030from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040032from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070033from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000034from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000035from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040038import isolated_format
aludwin81178302016-11-30 17:18:49 -080039import isolate_storage
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -040040import local_caching
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# Maximum expected delay (in seconds) between successive file fetches or uploads
48# in Storage. If it takes longer than that, a deadlock might be happening
49# and all stack frames for all threads are dumped to log.
50DEADLOCK_TIMEOUT = 5 * 60
51
52
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000054# All files are sorted by likelihood of a change in the file content
55# (currently file size is used to estimate this: larger the file -> larger the
56# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000057# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000058# and so on. Numbers here is a trade-off; the more per request, the lower the
59# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
60# larger values cause longer lookups, increasing the initial latency to start
61# uploading, which is especially an issue for large files. This value is
62# optimized for the "few thousands files to look up with minimal number of large
63# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040064ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000065
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000066
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000067# A list of already compressed extension types that should not receive any
68# compression before being uploaded.
69ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040070 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
71 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072]
73
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000074
maruel@chromium.org41601642013-09-18 19:40:46 +000075# The delay (in seconds) to wait between logging statements when retrieving
76# the required files. This is intended to let the user (or buildbot) know that
77# the program is still running.
78DELAY_BETWEEN_UPDATES_IN_SECS = 30
79
80
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050081DEFAULT_BLACKLIST = (
82 # Temporary vim or python files.
83 r'^.+\.(?:pyc|swp)$',
84 # .git or .svn directory.
85 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
86)
87
88
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050089class Error(Exception):
90 """Generic runtime error."""
91 pass
92
93
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040094class Aborted(Error):
95 """Operation aborted."""
96 pass
97
98
nodir90bc8dc2016-06-15 13:35:21 -070099class AlreadyExists(Error):
100 """File already exists."""
101
102
maruel12e30012015-10-09 11:55:35 -0700103def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800104 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700105 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800106 if offset:
107 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000108 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000109 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000110 if not data:
111 break
112 yield data
113
114
tansell9e04a8d2016-07-28 09:31:59 -0700115def fileobj_path(fileobj):
116 """Return file system path for file like object or None.
117
118 The returned path is guaranteed to exist and can be passed to file system
119 operations like copy.
120 """
121 name = getattr(fileobj, 'name', None)
122 if name is None:
123 return
124
125 # If the file like object was created using something like open("test.txt")
126 # name will end up being a str (such as a function outside our control, like
127 # the standard library). We want all our paths to be unicode objects, so we
128 # decode it.
129 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500130 # We incorrectly assume that UTF-8 is used everywhere.
131 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700132
tansell26de79e2016-11-13 18:41:11 -0800133 # fs.exists requires an absolute path, otherwise it will fail with an
134 # assertion error.
135 if not os.path.isabs(name):
136 return
137
tansell9e04a8d2016-07-28 09:31:59 -0700138 if fs.exists(name):
139 return name
140
141
142# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
143# wrappers have been created.
144def fileobj_copy(
145 dstfileobj, srcfileobj, size=-1,
146 chunk_size=isolated_format.DISK_FILE_CHUNK):
147 """Copy data from srcfileobj to dstfileobj.
148
149 Providing size means exactly that amount of data will be copied (if there
150 isn't enough data, an IOError exception is thrown). Otherwise all data until
151 the EOF marker will be copied.
152 """
153 if size == -1 and hasattr(srcfileobj, 'tell'):
154 if srcfileobj.tell() != 0:
155 raise IOError('partial file but not using size')
156
157 written = 0
158 while written != size:
159 readsize = chunk_size
160 if size > 0:
161 readsize = min(readsize, size-written)
162 data = srcfileobj.read(readsize)
163 if not data:
164 if size == -1:
165 break
166 raise IOError('partial file, got %s, wanted %s' % (written, size))
167 dstfileobj.write(data)
168 written += len(data)
169
170
171def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
172 """Put srcfileobj at the given dstpath with given mode.
173
174 The function aims to do this as efficiently as possible while still allowing
175 any possible file like object be given.
176
177 Creating a tree of hardlinks has a few drawbacks:
178 - tmpfs cannot be used for the scratch space. The tree has to be on the same
179 partition as the cache.
180 - involves a write to the inode, which advances ctime, cause a metadata
181 writeback (causing disk seeking).
182 - cache ctime cannot be used to detect modifications / corruption.
183 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
184 partition. This is why the function automatically fallbacks to copying the
185 file content.
186 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
187 same owner is for all hardlinks.
188 - Anecdotal report that ext2 is known to be potentially faulty on high rate
189 of hardlink creation.
190
191 Creating a tree of symlinks has a few drawbacks:
192 - Tasks running the equivalent of os.path.realpath() will get the naked path
193 and may fail.
194 - Windows:
195 - Symlinks are reparse points:
196 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
197 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
198 - Symbolic links are Win32 paths, not NT paths.
199 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
200 - Symbolic links are supported on Windows 7 and later only.
201 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
202 default.
203 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
204 RID is present in the token;
205 https://msdn.microsoft.com/en-us/library/bb530410.aspx
206 """
207 srcpath = fileobj_path(srcfileobj)
208 if srcpath and size == -1:
209 readonly = file_mode is None or (
210 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
211
212 if readonly:
213 # If the file is read only we can link the file
214 if use_symlink:
215 link_mode = file_path.SYMLINK_WITH_FALLBACK
216 else:
217 link_mode = file_path.HARDLINK_WITH_FALLBACK
218 else:
219 # If not read only, we must copy the file
220 link_mode = file_path.COPY
221
222 file_path.link_file(dstpath, srcpath, link_mode)
223 else:
224 # Need to write out the file
225 with fs.open(dstpath, 'wb') as dstfileobj:
226 fileobj_copy(dstfileobj, srcfileobj, size)
227
228 assert fs.exists(dstpath)
229
230 # file_mode of 0 is actually valid, so need explicit check.
231 if file_mode is not None:
232 fs.chmod(dstpath, file_mode)
233
234
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000235def zip_compress(content_generator, level=7):
236 """Reads chunks from |content_generator| and yields zip compressed chunks."""
237 compressor = zlib.compressobj(level)
238 for chunk in content_generator:
239 compressed = compressor.compress(chunk)
240 if compressed:
241 yield compressed
242 tail = compressor.flush(zlib.Z_FINISH)
243 if tail:
244 yield tail
245
246
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400247def zip_decompress(
248 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000249 """Reads zipped data from |content_generator| and yields decompressed data.
250
251 Decompresses data in small chunks (no larger than |chunk_size|) so that
252 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
253
254 Raises IOError if data is corrupted or incomplete.
255 """
256 decompressor = zlib.decompressobj()
257 compressed_size = 0
258 try:
259 for chunk in content_generator:
260 compressed_size += len(chunk)
261 data = decompressor.decompress(chunk, chunk_size)
262 if data:
263 yield data
264 while decompressor.unconsumed_tail:
265 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
266 if data:
267 yield data
268 tail = decompressor.flush()
269 if tail:
270 yield tail
271 except zlib.error as e:
272 raise IOError(
273 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
274 # Ensure all data was read and decompressed.
275 if decompressor.unused_data or decompressor.unconsumed_tail:
276 raise IOError('Not all data was decompressed')
277
278
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000279def get_zip_compression_level(filename):
280 """Given a filename calculates the ideal zip compression level to use."""
281 file_ext = os.path.splitext(filename)[1].lower()
282 # TODO(csharp): Profile to find what compression level works best.
283 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
284
285
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000286def create_directories(base_directory, files):
287 """Creates the directory structure needed by the given list of files."""
288 logging.debug('create_directories(%s, %d)', base_directory, len(files))
289 # Creates the tree of directories to create.
290 directories = set(os.path.dirname(f) for f in files)
291 for item in list(directories):
292 while item:
293 directories.add(item)
294 item = os.path.dirname(item)
295 for d in sorted(directories):
296 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700297 abs_d = os.path.join(base_directory, d)
298 if not fs.isdir(abs_d):
299 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000300
301
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500302def create_symlinks(base_directory, files):
303 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000304 for filepath, properties in files:
305 if 'l' not in properties:
306 continue
307 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500308 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000309 logging.warning('Ignoring symlink %s', filepath)
310 continue
311 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700312 try:
313 os.symlink(properties['l'], outfile) # pylint: disable=E1101
314 except OSError as e:
315 if e.errno == errno.EEXIST:
316 raise AlreadyExists('File %s already exists.' % outfile)
317 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000318
319
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400320class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800321 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000322
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800323 Its digest and size may be provided in advance, if known. Otherwise they will
324 be derived from the file content.
325 """
326
327 def __init__(self, path, digest=None, size=None, high_priority=False):
328 super(FileItem, self).__init__(
329 digest,
maruel12e30012015-10-09 11:55:35 -0700330 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800331 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000332 self.path = path
333 self.compression_level = get_zip_compression_level(path)
334
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800335 def content(self):
336 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000337
338
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400339class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000340 """A byte buffer to push to Storage."""
341
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800342 def __init__(self, buf, high_priority=False):
343 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000344 self.buffer = buf
345
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800346 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000347 return [self.buffer]
348
349
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000350class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800351 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000352
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800353 Implements compression support, parallel 'contains' checks, parallel uploads
354 and more.
355
356 Works only within single namespace (and thus hashing algorithm and compression
357 scheme are fixed).
358
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400359 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
360 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800361 """
362
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700363 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000364 self._storage_api = storage_api
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400365 self._use_zip = isolated_format.is_namespace_with_compression(
aludwinf33b4bd2017-06-29 12:01:03 -0700366 storage_api.namespace) and not storage_api.internal_compression
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400367 self._hash_algo = isolated_format.get_hash_algo(storage_api.namespace)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000368 self._cpu_thread_pool = None
369 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400370 self._aborted = False
371 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000372
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000373 @property
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700374 def hash_algo(self):
375 """Hashing algorithm used to name files in storage based on their content.
376
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400377 Defined by |namespace|. See also isolated_format.get_hash_algo().
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700378 """
379 return self._hash_algo
380
381 @property
382 def location(self):
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -0500383 """URL of the backing store that this class is using."""
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700384 return self._storage_api.location
385
386 @property
387 def namespace(self):
388 """Isolate namespace used by this storage.
389
390 Indirectly defines hashing scheme and compression method used.
391 """
392 return self._storage_api.namespace
393
394 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000395 def cpu_thread_pool(self):
396 """ThreadPool for CPU-bound tasks like zipping."""
397 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500398 threads = max(threading_utils.num_processors(), 2)
399 if sys.maxsize <= 2L**32:
400 # On 32 bits userland, do not try to use more than 16 threads.
401 threads = min(threads, 16)
402 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000403 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000404
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000405 @property
406 def net_thread_pool(self):
407 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
408 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700409 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000410 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000411
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000412 def close(self):
413 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400414 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000415 if self._cpu_thread_pool:
416 self._cpu_thread_pool.join()
417 self._cpu_thread_pool.close()
418 self._cpu_thread_pool = None
419 if self._net_thread_pool:
420 self._net_thread_pool.join()
421 self._net_thread_pool.close()
422 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400423 logging.info('Done.')
424
425 def abort(self):
426 """Cancels any pending or future operations."""
427 # This is not strictly theadsafe, but in the worst case the logging message
428 # will be printed twice. Not a big deal. In other places it is assumed that
429 # unprotected reads and writes to _aborted are serializable (it is true
430 # for python) and thus no locking is used.
431 if not self._aborted:
432 logging.warning('Aborting... It can take a while.')
433 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000434
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000435 def __enter__(self):
436 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400437 assert not self._prev_sig_handlers, self._prev_sig_handlers
438 for s in (signal.SIGINT, signal.SIGTERM):
439 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000440 return self
441
442 def __exit__(self, _exc_type, _exc_value, _traceback):
443 """Context manager interface."""
444 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400445 while self._prev_sig_handlers:
446 s, h = self._prev_sig_handlers.popitem()
447 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000448 return False
449
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000450 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800451 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000452
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800453 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000454
455 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400456 items: list of isolate_storage.Item instances that represents data to
457 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000458
459 Returns:
460 List of items that were uploaded. All other items are already there.
461 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700462 logging.info('upload_items(items=%d)', len(items))
463
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800464 # Ensure all digests are calculated.
465 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700466 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800467
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400468 # For each digest keep only first isolate_storage.Item that matches it. All
469 # other items are just indistinguishable copies from the point of view of
470 # isolate server (it doesn't care about paths at all, only content and
471 # digests).
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000472 seen = {}
473 duplicates = 0
474 for item in items:
475 if seen.setdefault(item.digest, item) is not item:
476 duplicates += 1
477 items = seen.values()
478 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700479 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000480
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000481 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000482 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000483 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800484 channel = threading_utils.TaskChannel()
485 for missing_item, push_state in self.get_missing_items(items):
486 missing.add(missing_item)
487 self.async_push(channel, missing_item, push_state)
488
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000489 # No need to spawn deadlock detector thread if there's nothing to upload.
490 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700491 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000492 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000493 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 detector.ping()
495 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000496 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000497 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000498 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000499 logging.info('All files are uploaded')
500
501 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000502 total = len(items)
503 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000504 logging.info(
505 'Total: %6d, %9.1fkb',
506 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000507 total_size / 1024.)
508 cache_hit = set(items) - missing
509 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000510 logging.info(
511 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
512 len(cache_hit),
513 cache_hit_size / 1024.,
514 len(cache_hit) * 100. / total,
515 cache_hit_size * 100. / total_size if total_size else 0)
516 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000517 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000518 logging.info(
519 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
520 len(cache_miss),
521 cache_miss_size / 1024.,
522 len(cache_miss) * 100. / total,
523 cache_miss_size * 100. / total_size if total_size else 0)
524
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000525 return uploaded
526
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800527 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000528 """Starts asynchronous push to the server in a parallel thread.
529
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800530 Can be used only after |item| was checked for presence on a server with
531 'get_missing_items' call. 'get_missing_items' returns |push_state| object
532 that contains storage specific information describing how to upload
533 the item (for example in case of cloud storage, it is signed upload URLs).
534
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000535 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000536 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400537 item: item to upload as instance of isolate_storage.Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800538 push_state: push state returned by 'get_missing_items' call for |item|.
539
540 Returns:
541 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000542 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800543 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400544 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700545 threading_utils.PRIORITY_HIGH if item.high_priority
546 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800547
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000548 def push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400549 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400550 if self._aborted:
551 raise Aborted()
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700552 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800553 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 return item
555
Wei Huang1a38fbe2017-11-28 22:55:22 -0500556 # If zipping is not required, just start a push task. Don't pass 'content'
557 # so that it can create a new generator when it retries on failures.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700558 if not self._use_zip:
Wei Huang1a38fbe2017-11-28 22:55:22 -0500559 self.net_thread_pool.add_task_with_channel(channel, priority, push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000560 return
561
562 # If zipping is enabled, zip in a separate thread.
563 def zip_and_push():
564 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
565 # content right here. It will block until all file is zipped.
566 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400567 if self._aborted:
568 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800569 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000570 data = ''.join(stream)
571 except Exception as exc:
572 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800573 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000574 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500575 # Pass '[data]' explicitly because the compressed data is not same as the
576 # one provided by 'item'. Since '[data]' is a list, it can safely be
577 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000578 self.net_thread_pool.add_task_with_channel(
579 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000581
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800582 def push(self, item, push_state):
583 """Synchronously pushes a single item to the server.
584
585 If you need to push many items at once, consider using 'upload_items' or
586 'async_push' with instance of TaskChannel.
587
588 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400589 item: item to upload as instance of isolate_storage.Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800590 push_state: push state returned by 'get_missing_items' call for |item|.
591
592 Returns:
593 Pushed item (same object as |item|).
594 """
595 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700596 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800597 self.async_push(channel, item, push_state)
598 pushed = channel.pull()
599 assert pushed is item
600 return item
601
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000602 def async_fetch(self, channel, priority, digest, size, sink):
603 """Starts asynchronous fetch from the server in a parallel thread.
604
605 Arguments:
606 channel: TaskChannel that receives back |digest| when download ends.
607 priority: thread pool task priority for the fetch.
608 digest: hex digest of an item to download.
609 size: expected size of the item (after decompression).
610 sink: function that will be called as sink(generator).
611 """
612 def fetch():
613 try:
614 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400615 stream = self._storage_api.fetch(digest, size, 0)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700616 if self._use_zip:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400617 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000618 # Run |stream| through verifier that will assert its size.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400619 verifier = FetchStreamVerifier(stream, self._hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000620 # Verified stream goes to |sink|.
621 sink(verifier.run())
622 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800623 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000624 raise
625 return digest
626
627 # Don't bother with zip_thread_pool for decompression. Decompression is
628 # really fast and most probably IO bound anyway.
629 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
630
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000631 def get_missing_items(self, items):
632 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000633
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000634 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000635
636 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400637 items: a list of isolate_storage.Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000638
639 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800640 For each missing item it yields a pair (item, push_state), where:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400641 * item - isolate_storage.Item object that is missing (one of |items|).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800642 * push_state - opaque object that contains storage specific information
643 describing how to upload the item (for example in case of cloud
644 storage, it is signed upload URLs). It can later be passed to
645 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000646 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000647 channel = threading_utils.TaskChannel()
648 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649
650 # Ensure all digests are calculated.
651 for item in items:
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700652 item.prepare(self._hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800653
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400654 def contains(batch):
655 if self._aborted:
656 raise Aborted()
657 return self._storage_api.contains(batch)
658
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000659 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800660 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400661 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400662 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000663 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800664
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000665 # Yield results as they come in.
666 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800667 for missing_item, push_state in channel.pull().iteritems():
668 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000669
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000670
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800671def batch_items_for_check(items):
672 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000673
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800674 Each batch corresponds to a single 'exists?' query to the server via a call
675 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000676
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800677 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400678 items: a list of isolate_storage.Item objects.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800679
680 Yields:
681 Batches of items to query for existence in a single operation,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400682 each batch is a list of isolate_storage.Item objects.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800683 """
684 batch_count = 0
685 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
686 next_queries = []
687 for item in sorted(items, key=lambda x: x.size, reverse=True):
688 next_queries.append(item)
689 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000690 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800691 next_queries = []
692 batch_count += 1
693 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
694 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
695 if next_queries:
696 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000697
698
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000699class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400700 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000701
702 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400703 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
704 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000705 """
706
707 def __init__(self, storage, cache):
708 self.storage = storage
709 self.cache = cache
710 self._channel = threading_utils.TaskChannel()
711 self._pending = set()
712 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000713 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400714 # Pending digests that the caller waits for, see wait_on()/wait().
715 self._waiting_on = set()
716 # Already fetched digests the caller waits for which are not yet returned by
717 # wait().
718 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000719
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400720 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700721 self,
722 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400723 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700724 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000725 """Starts asynchronous fetch of item |digest|."""
726 # Fetching it now?
727 if digest in self._pending:
728 return
729
730 # Mark this file as in use, verify_all_cached will later ensure it is still
731 # in cache.
732 self._accessed.add(digest)
733
734 # Already fetched? Notify cache to update item's LRU position.
735 if digest in self._fetched:
736 # 'touch' returns True if item is in cache and not corrupted.
737 if self.cache.touch(digest, size):
738 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000739 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000740 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000741
742 # TODO(maruel): It should look at the free disk space, the current cache
743 # size and the size of the new item on every new item:
744 # - Trim the cache as more entries are listed when free disk space is low,
745 # otherwise if the amount of data downloaded during the run > free disk
746 # space, it'll crash.
747 # - Make sure there's enough free disk space to fit all dependencies of
748 # this run! If not, abort early.
749
750 # Start fetching.
751 self._pending.add(digest)
752 self.storage.async_fetch(
753 self._channel, priority, digest, size,
754 functools.partial(self.cache.write, digest))
755
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400756 def wait_on(self, digest):
757 """Updates digests to be waited on by 'wait'."""
758 # Calculate once the already fetched items. These will be retrieved first.
759 if digest in self._fetched:
760 self._waiting_on_ready.add(digest)
761 else:
762 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000763
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400764 def wait(self):
765 """Waits until any of waited-on items is retrieved.
766
767 Once this happens, it is remove from the waited-on set and returned.
768
769 This function is called in two waves. The first wave it is done for HIGH
770 priority items, the isolated files themselves. The second wave it is called
771 for all the files.
772
773 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000774 """
775 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400776 if self._waiting_on_ready:
777 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000778
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400779 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000780
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400781 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000782 while self._pending:
783 digest = self._channel.pull()
784 self._pending.remove(digest)
785 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400786 if digest in self._waiting_on:
787 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000788 return digest
789
790 # Should never reach this point due to assert above.
791 raise RuntimeError('Impossible state')
792
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400793 @property
794 def wait_queue_empty(self):
795 """Returns True if there is no digest left for wait() to return."""
796 return not self._waiting_on and not self._waiting_on_ready
797
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000798 def inject_local_file(self, path, algo):
799 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700800 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000801 data = f.read()
802 digest = algo(data).hexdigest()
803 self.cache.write(digest, [data])
804 self._fetched.add(digest)
805 return digest
806
807 @property
808 def pending_count(self):
809 """Returns number of items to be fetched."""
810 return len(self._pending)
811
812 def verify_all_cached(self):
813 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000814 # Not thread safe, but called after all work is done.
815 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000816
817
818class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400819 """Verifies that fetched file is valid before passing it to the
820 ContentAddressedCache.
821 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400823 def __init__(self, stream, hasher, expected_digest, expected_size):
824 """Initializes the verifier.
825
826 Arguments:
827 * stream: an iterable yielding chunks of content
828 * hasher: an object from hashlib that supports update() and hexdigest()
829 (eg, hashlib.sha1).
830 * expected_digest: if the entire stream is piped through hasher and then
831 summarized via hexdigest(), this should be the result. That is, it
832 should be a hex string like 'abc123'.
833 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400834 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400835 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400836 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000837 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400838 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000839 self.expected_size = expected_size
840 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400841 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000842
843 def run(self):
844 """Generator that yields same items as |stream|.
845
846 Verifies |stream| is complete before yielding a last chunk to consumer.
847
848 Also wraps IOError produced by consumer into MappingError exceptions since
849 otherwise Storage will retry fetch on unrelated local cache errors.
850 """
851 # Read one chunk ahead, keep it in |stored|.
852 # That way a complete stream can be verified before pushing last chunk
853 # to consumer.
854 stored = None
855 for chunk in self.stream:
856 assert chunk is not None
857 if stored is not None:
858 self._inspect_chunk(stored, is_last=False)
859 try:
860 yield stored
861 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400862 raise isolated_format.MappingError(
863 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000864 stored = chunk
865 if stored is not None:
866 self._inspect_chunk(stored, is_last=True)
867 try:
868 yield stored
869 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400870 raise isolated_format.MappingError(
871 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000872
873 def _inspect_chunk(self, chunk, is_last):
874 """Called for each fetched chunk before passing it to consumer."""
875 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400876 self.rolling_hash.update(chunk)
877 if not is_last:
878 return
879
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400880 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000881 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400882 msg = 'Incorrect file size: want %d, got %d' % (
883 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400884 raise IOError(msg)
885
886 actual_digest = self.rolling_hash.hexdigest()
887 if self.expected_digest != actual_digest:
888 msg = 'Incorrect digest: want %s, got %s' % (
889 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400890 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000891
892
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700893class IsolatedBundle(object):
894 """Fetched and parsed .isolated file with all dependencies."""
895
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000896 def __init__(self, filter_cb):
897 """
898 filter_cb: callback function to filter downloaded content.
899 When filter_cb is not None, Isolated file is downloaded iff
900 filter_cb(filepath) returns True.
901 """
902
Vadim Shtayura3148e072014-09-02 18:51:52 -0700903 self.command = []
904 self.files = {}
905 self.read_only = None
906 self.relative_cwd = None
907 # The main .isolated file, a IsolatedFile instance.
908 self.root = None
909
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000910 self._filter_cb = filter_cb
911
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700912 def fetch(self, fetch_queue, root_isolated_hash, algo):
913 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700914
915 It enables support for "included" .isolated files. They are processed in
916 strict order but fetched asynchronously from the cache. This is important so
917 that a file in an included .isolated file that is overridden by an embedding
918 .isolated file is not fetched needlessly. The includes are fetched in one
919 pass and the files are fetched as soon as all the ones on the left-side
920 of the tree were fetched.
921
922 The prioritization is very important here for nested .isolated files.
923 'includes' have the highest priority and the algorithm is optimized for both
924 deep and wide trees. A deep one is a long link of .isolated files referenced
925 one at a time by one item in 'includes'. A wide one has a large number of
926 'includes' in a single .isolated file. 'left' is defined as an included
927 .isolated file earlier in the 'includes' list. So the order of the elements
928 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700929
930 As a side effect this method starts asynchronous fetch of all data files
931 by adding them to |fetch_queue|. It doesn't wait for data files to finish
932 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700933 """
934 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
935
936 # Isolated files being retrieved now: hash -> IsolatedFile instance.
937 pending = {}
938 # Set of hashes of already retrieved items to refuse recursive includes.
939 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700940 # Set of IsolatedFile's whose data files have already being fetched.
941 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700942
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700943 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400944 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -0700945 h = isolated_file.obj_hash
946 if h in seen:
947 raise isolated_format.IsolatedError(
948 'IsolatedFile %s is retrieved recursively' % h)
949 assert h not in pending
950 seen.add(h)
951 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400952 # This isolated item is being added dynamically, notify FetchQueue.
953 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700954 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
955
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700956 # Start fetching root *.isolated file (single file, not the whole bundle).
957 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700958
959 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700960 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400961 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700962 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -0700963 with fetch_queue.cache.getfileobj(item_hash) as f:
964 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -0700965
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700966 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700967 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700968 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700969
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700970 # Always fetch *.isolated files in traversal order, waiting if necessary
971 # until next to-be-processed node loads. "Waiting" is done by yielding
972 # back to the outer loop, that waits until some *.isolated is loaded.
973 for node in isolated_format.walk_includes(self.root):
974 if node not in processed:
975 # Not visited, and not yet loaded -> wait for it to load.
976 if not node.is_loaded:
977 break
978 # Not visited and loaded -> process it and continue the traversal.
979 self._start_fetching_files(node, fetch_queue)
980 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700981
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700982 # All *.isolated files should be processed by now and only them.
983 all_isolateds = set(isolated_format.walk_includes(self.root))
984 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400985 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -0700986
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700987 # Extract 'command' and other bundle properties.
988 for node in isolated_format.walk_includes(self.root):
989 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700990 self.relative_cwd = self.relative_cwd or ''
991
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700992 def _start_fetching_files(self, isolated, fetch_queue):
993 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700994
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700995 Modifies self.files.
996 """
maruel10bea7b2016-12-07 05:03:49 -0800997 files = isolated.data.get('files', {})
998 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
999 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001000 if self._filter_cb and not self._filter_cb(filepath):
1001 continue
1002
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001003 # Root isolated has priority on the files being mapped. In particular,
1004 # overridden files must not be fetched.
1005 if filepath not in self.files:
1006 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001007
1008 # Make sure if the isolated is read only, the mode doesn't have write
1009 # bits.
1010 if 'm' in properties and self.read_only:
1011 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1012
1013 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001014 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001015 fetch_queue.add(
1016 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1017
1018 def _update_self(self, node):
1019 """Extracts bundle global parameters from loaded *.isolated file.
1020
1021 Will be called with each loaded *.isolated file in order of traversal of
1022 isolated include graph (see isolated_format.walk_includes).
1023 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001024 # Grabs properties.
1025 if not self.command and node.data.get('command'):
1026 # Ensure paths are correctly separated on windows.
1027 self.command = node.data['command']
1028 if self.command:
1029 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001030 if self.read_only is None and node.data.get('read_only') is not None:
1031 self.read_only = node.data['read_only']
1032 if (self.relative_cwd is None and
1033 node.data.get('relative_cwd') is not None):
1034 self.relative_cwd = node.data['relative_cwd']
1035
1036
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001037def get_storage(url, namespace):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001038 """Returns Storage class that can upload and download from |namespace|.
1039
1040 Arguments:
Marc-Antoine Ruelb10edf22014-12-11 13:33:57 -05001041 url: URL of isolate service to use shared cloud based storage.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001042 namespace: isolate namespace to operate in, also defines hashing and
1043 compression scheme used, i.e. namespace names that end with '-gzip'
1044 store compressed data.
1045
1046 Returns:
1047 Instance of Storage.
1048 """
aludwin81178302016-11-30 17:18:49 -08001049 return Storage(isolate_storage.get_storage_api(url, namespace))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001050
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001051
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001052def upload_tree(base_url, infiles, namespace):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001053 """Uploads the given tree to the given url.
1054
1055 Arguments:
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001056 base_url: The url of the isolate server to upload to.
1057 infiles: iterable of pairs (absolute path, metadata dict) of files.
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +00001058 namespace: The namespace to use on the server.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001059 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001060 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001061 # Filter out symlinks, since they are not represented by items on isolate
1062 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001063 items = []
1064 seen = set()
1065 skipped = 0
1066 for filepath, metadata in infiles:
maruel12e30012015-10-09 11:55:35 -07001067 assert isinstance(filepath, unicode), filepath
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001068 if 'l' not in metadata and filepath not in seen:
1069 seen.add(filepath)
1070 item = FileItem(
1071 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001072 digest=metadata['h'],
1073 size=metadata['s'],
1074 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001075 items.append(item)
1076 else:
1077 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001078
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001079 logging.info('Skipped %d duplicated entries', skipped)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001080 with get_storage(base_url, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001081 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001082
1083
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001084def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1085 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001086 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001087
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001088 Arguments:
1089 isolated_hash: hash of the root *.isolated file.
1090 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001091 cache: ContentAddressedCache class that knows how to store and map files
1092 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001093 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001094 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001095 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001096
1097 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001098 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001099 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001100 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001101 'fetch_isolated(%s, %s, %s, %s, %s)',
1102 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001103 # Hash algorithm to use, defined by namespace |storage| is using.
1104 algo = storage.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001105 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001106 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001107
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001108 with tools.Profiler('GetIsolateds'):
1109 # Optionally support local files by manually adding them to cache.
1110 if not isolated_format.is_valid_hash(isolated_hash, algo):
1111 logging.debug('%s is not a valid hash, assuming a file '
1112 '(algo was %s, hash size was %d)',
1113 isolated_hash, algo(), algo().digest_size)
1114 path = unicode(os.path.abspath(isolated_hash))
1115 try:
1116 isolated_hash = fetch_queue.inject_local_file(path, algo)
1117 except IOError as e:
1118 raise isolated_format.MappingError(
1119 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1120 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001121
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001122 # Load all *.isolated and start loading rest of the files.
1123 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001124
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001125 with tools.Profiler('GetRest'):
1126 # Create file system hierarchy.
1127 file_path.ensure_tree(outdir)
1128 create_directories(outdir, bundle.files)
1129 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001130
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001131 # Ensure working directory exists.
1132 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1133 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001134
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001135 # Multimap: digest -> list of pairs (path, props).
1136 remaining = {}
1137 for filepath, props in bundle.files.iteritems():
1138 if 'h' in props:
1139 remaining.setdefault(props['h'], []).append((filepath, props))
1140 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001141
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001142 # Now block on the remaining files to be downloaded and mapped.
1143 logging.info('Retrieving remaining files (%d of them)...',
1144 fetch_queue.pending_count)
1145 last_update = time.time()
1146 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1147 while remaining:
1148 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001149
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001150 # Wait for any item to finish fetching to cache.
1151 digest = fetch_queue.wait()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001152
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001153 # Create the files in the destination using item in cache as the
1154 # source.
1155 for filepath, props in remaining.pop(digest):
1156 fullpath = os.path.join(outdir, filepath)
tansell9e04a8d2016-07-28 09:31:59 -07001157
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001158 with cache.getfileobj(digest) as srcfileobj:
1159 filetype = props.get('t', 'basic')
tanselle4288c32016-07-28 09:45:40 -07001160
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001161 if filetype == 'basic':
1162 # Ignore all bits apart from the user.
1163 file_mode = (props.get('m') or 0500) & 0700
1164 if bundle.read_only:
1165 # Enforce read-only if the root bundle does.
1166 file_mode &= 0500
1167 putfile(
1168 srcfileobj, fullpath, file_mode,
1169 use_symlink=use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001170
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001171 elif filetype == 'tar':
1172 basedir = os.path.dirname(fullpath)
1173 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1174 for ti in t:
1175 if not ti.isfile():
1176 logging.warning(
1177 'Path(%r) is nonfile (%s), skipped',
1178 ti.name, ti.type)
1179 continue
1180 # Handle files created on Windows fetched on POSIX and the
1181 # reverse.
1182 other_sep = '/' if os.path.sep == '\\' else '\\'
1183 name = ti.name.replace(other_sep, os.path.sep)
1184 fp = os.path.normpath(os.path.join(basedir, name))
1185 if not fp.startswith(basedir):
1186 logging.error(
1187 'Path(%r) is outside root directory',
1188 fp)
1189 ifd = t.extractfile(ti)
1190 file_path.ensure_tree(os.path.dirname(fp))
1191 file_mode = ti.mode & 0700
1192 if bundle.read_only:
1193 # Enforce read-only if the root bundle does.
1194 file_mode &= 0500
1195 putfile(ifd, fp, file_mode, ti.size)
tansell26de79e2016-11-13 18:41:11 -08001196
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001197 else:
1198 raise isolated_format.IsolatedError(
1199 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001200
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001201 # Report progress.
1202 duration = time.time() - last_update
1203 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1204 msg = '%d files remaining...' % len(remaining)
1205 sys.stdout.write(msg + '\n')
1206 sys.stdout.flush()
1207 logging.info(msg)
1208 last_update = time.time()
1209 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001210
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001211 # Save the cache right away to not loose the state of the new objects.
1212 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001213 # Cache could evict some items we just tried to fetch, it's a fatal error.
1214 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001215 free_disk = file_path.get_free_space(cache.cache_dir)
1216 msg = (
1217 'Cache is too small to hold all requested files.\n'
1218 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001219 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001220 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001221 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001222
1223
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001224def directory_to_metadata(root, algo, blacklist):
1225 """Returns the FileItem list and .isolated metadata for a directory."""
1226 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001227 paths = isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruel7a68f712017-12-01 18:45:18 -05001228 root, u'.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001229 metadata = {
1230 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001231 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001232 for relpath in paths
1233 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001234 for v in metadata.itervalues():
1235 v.pop('t')
1236 items = [
1237 FileItem(
1238 path=os.path.join(root, relpath),
1239 digest=meta['h'],
1240 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001241 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001242 for relpath, meta in metadata.iteritems() if 'h' in meta
1243 ]
1244 return items, metadata
1245
1246
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001247def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001248 """Stores every entries and returns the relevant data.
1249
1250 Arguments:
1251 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001252 files: list of file paths to upload. If a directory is specified, a
1253 .isolated file is created and its hash is returned.
1254 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001255
1256 Returns:
1257 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1258 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001259 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001260 assert all(isinstance(i, unicode) for i in files), files
1261 if len(files) != len(set(map(os.path.abspath, files))):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001262 raise AlreadyExists('Duplicate entries found.')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001263
maruel064c0a32016-04-05 11:47:15 -07001264 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001265 results = []
1266 # The temporary directory is only created as needed.
1267 tempdir = None
1268 try:
1269 # TODO(maruel): Yield the files to a worker thread.
1270 items_to_upload = []
1271 for f in files:
1272 try:
1273 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001274 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001275 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001276 items, metadata = directory_to_metadata(
1277 filepath, storage.hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001278
1279 # Create the .isolated file.
1280 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001281 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
1282 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001283 os.close(handle)
1284 data = {
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001285 'algo':
1286 isolated_format.SUPPORTED_ALGOS_REVERSE[storage.hash_algo],
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001287 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001288 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001289 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001290 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001291 h = isolated_format.hash_file(isolated, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001292 items_to_upload.extend(items)
1293 items_to_upload.append(
1294 FileItem(
1295 path=isolated,
1296 digest=h,
maruel12e30012015-10-09 11:55:35 -07001297 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001298 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001299 results.append((h, f))
1300
maruel12e30012015-10-09 11:55:35 -07001301 elif fs.isfile(filepath):
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001302 h = isolated_format.hash_file(filepath, storage.hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001303 items_to_upload.append(
1304 FileItem(
1305 path=filepath,
1306 digest=h,
maruel12e30012015-10-09 11:55:35 -07001307 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001308 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001309 results.append((h, f))
1310 else:
1311 raise Error('%s is neither a file or directory.' % f)
1312 except OSError:
1313 raise Error('Failed to process %s.' % f)
maruel064c0a32016-04-05 11:47:15 -07001314 uploaded = storage.upload_items(items_to_upload)
1315 cold = [i for i in items_to_upload if i in uploaded]
1316 hot = [i for i in items_to_upload if i not in uploaded]
1317 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001318 finally:
maruel12e30012015-10-09 11:55:35 -07001319 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001320 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001321
1322
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001323def archive(out, namespace, files, blacklist):
1324 if files == ['-']:
1325 files = sys.stdin.readlines()
1326
1327 if not files:
1328 raise Error('Nothing to upload')
1329
1330 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001331 blacklist = tools.gen_blacklist(blacklist)
1332 with get_storage(out, namespace) as storage:
maruel064c0a32016-04-05 11:47:15 -07001333 # Ignore stats.
1334 results = archive_files_to_storage(storage, files, blacklist)[0]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001335 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1336
1337
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001338@subcommand.usage('<file1..fileN> or - to read from stdin')
1339def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001340 """Archives data to the server.
1341
1342 If a directory is specified, a .isolated file is created the whole directory
1343 is uploaded. Then this .isolated file can be included in another one to run
1344 commands.
1345
1346 The commands output each file that was processed with its content hash. For
1347 directories, the .isolated generated for the directory is listed as the
1348 directory entry itself.
1349 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001350 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001351 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001352 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001353 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001354 try:
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001355 archive(options.isolate_server, options.namespace, files, options.blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001356 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001357 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001358 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001359
1360
1361def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001362 """Download data from the server.
1363
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001364 It can either download individual files or a complete tree from a .isolated
1365 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001366 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001367 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001368 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001369 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001370 help='hash of an isolated file, .isolated file content is discarded, use '
1371 '--file if you need it')
1372 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001373 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1374 help='hash and destination of a file, can be used multiple times')
1375 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001376 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001377 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001378 parser.add_option(
1379 '--use-symlinks', action='store_true',
1380 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001381 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001382 options, args = parser.parse_args(args)
1383 if args:
1384 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001385 if not file_path.enable_symlink():
1386 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001387
nodir55be77b2016-05-03 09:39:57 -07001388 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001389 if bool(options.isolated) == bool(options.file):
1390 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001391 if not options.cache and options.use_symlinks:
1392 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001393
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001394 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001395 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001396 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001397 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001398 if (fs.isfile(options.target) or
1399 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001400 parser.error(
1401 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001402 with get_storage(options.isolate_server, options.namespace) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001403 # Fetching individual files.
1404 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001405 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001406 channel = threading_utils.TaskChannel()
1407 pending = {}
1408 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001409 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001410 pending[digest] = dest
1411 storage.async_fetch(
1412 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001413 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001414 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001415 local_caching.UNKNOWN_FILE_SIZE,
1416 functools.partial(
1417 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001418 while pending:
1419 fetched = channel.pull()
1420 dest = pending.pop(fetched)
1421 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001422
Vadim Shtayura3172be52013-12-03 12:49:05 -08001423 # Fetching whole isolated tree.
1424 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001425 bundle = fetch_isolated(
1426 isolated_hash=options.isolated,
1427 storage=storage,
1428 cache=cache,
1429 outdir=options.target,
1430 use_symlinks=options.use_symlinks)
1431 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001432 if bundle.command:
1433 rel = os.path.join(options.target, bundle.relative_cwd)
1434 print('To run this test please run from the directory %s:' %
1435 os.path.join(options.target, rel))
1436 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001437
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001438 return 0
1439
1440
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001441def add_archive_options(parser):
1442 parser.add_option(
1443 '--blacklist',
1444 action='append', default=list(DEFAULT_BLACKLIST),
1445 help='List of regexp to use as blacklist filter when uploading '
1446 'directories')
1447
1448
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001449def add_isolate_server_options(parser):
1450 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001451 parser.add_option(
1452 '-I', '--isolate-server',
1453 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001454 help='URL of the Isolate Server to use. Defaults to the environment '
1455 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1456 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001457 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001458 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001459 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001460 '--namespace', default='default-gzip',
1461 help='The namespace to use on the Isolate Server, default: %default')
1462
1463
nodir55be77b2016-05-03 09:39:57 -07001464def process_isolate_server_options(
1465 parser, options, set_exception_handler, required):
1466 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001467
1468 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001469 """
1470 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001471 if required:
1472 parser.error('--isolate-server is required.')
1473 return
1474
aludwind7b7b7e2017-06-29 16:38:50 -07001475 if options.grpc_proxy:
1476 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001477 else:
1478 try:
1479 options.isolate_server = net.fix_url(options.isolate_server)
1480 except ValueError as e:
1481 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001482 if set_exception_handler:
1483 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001484 try:
1485 return auth.ensure_logged_in(options.isolate_server)
1486 except ValueError as e:
1487 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001488
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001489
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001490def add_cache_options(parser):
1491 cache_group = optparse.OptionGroup(parser, 'Cache management')
1492 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001493 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001494 help='Directory to keep a local cache of the files. Accelerates download '
1495 'by reusing already downloaded files. Default=%default')
1496 cache_group.add_option(
1497 '--max-cache-size',
1498 type='int',
1499 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001500 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001501 help='Trim if the cache gets larger than this value, default=%default')
1502 cache_group.add_option(
1503 '--min-free-space',
1504 type='int',
1505 metavar='NNN',
1506 default=2*1024*1024*1024,
1507 help='Trim if disk free space becomes lower than this value, '
1508 'default=%default')
1509 cache_group.add_option(
1510 '--max-items',
1511 type='int',
1512 metavar='NNN',
1513 default=100000,
1514 help='Trim if more than this number of items are in the cache '
1515 'default=%default')
1516 parser.add_option_group(cache_group)
1517
1518
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001519def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001520 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001521 policies = local_caching.CachePolicies(
1522 options.max_cache_size,
1523 options.min_free_space,
1524 options.max_items,
1525 # 3 weeks.
1526 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001527
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001528 # |options.cache| path may not exist until DiskContentAddressedCache()
1529 # instance is created.
1530 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001531 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001532 policies,
nodirf33b8d62016-10-26 22:34:58 -07001533 isolated_format.get_hash_algo(options.namespace),
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001534 trim,
maruele6fc9382017-05-04 09:03:48 -07001535 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001536 else:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001537 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001538
1539
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001540class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001541 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001542 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001543 self,
1544 version=__version__,
1545 prog=os.path.basename(sys.modules[__name__].__file__),
1546 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001547 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001548
1549 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001550 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001551 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001552 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001553 return options, args
1554
1555
1556def main(args):
1557 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001558 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001559
1560
1561if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001562 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001563 fix_encoding.fix_encoding()
1564 tools.disable_buffering()
1565 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001566 sys.exit(main(sys.argv[1:]))