blob: 646b3be29621303a956230f50bfe7718142e09d7 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00008__version__ = '0.8.6'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
nodir90bc8dc2016-06-15 13:35:21 -070010import errno
tansell9e04a8d2016-07-28 09:31:59 -070011import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000012import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040013import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000014import os
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000015import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040016import signal
tansell9e04a8d2016-07-28 09:31:59 -070017import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000018import sys
tansell26de79e2016-11-13 18:41:11 -080019import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050020import tempfile
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000021import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000022import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000023
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000024from third_party import colorama
25from third_party.depot_tools import fix_encoding
26from third_party.depot_tools import subcommand
27
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050028from utils import file_path
maruel12e30012015-10-09 11:55:35 -070029from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040030from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000031from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040032from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070033from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000034from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000035from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000036
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080037import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040038import isolated_format
aludwin81178302016-11-30 17:18:49 -080039import isolate_storage
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -040040import local_caching
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080041
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000042
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000043# Version of isolate protocol passed to the server in /handshake request.
44ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046
Vadim Shtayura3148e072014-09-02 18:51:52 -070047# Maximum expected delay (in seconds) between successive file fetches or uploads
48# in Storage. If it takes longer than that, a deadlock might be happening
49# and all stack frames for all threads are dumped to log.
50DEADLOCK_TIMEOUT = 5 * 60
51
52
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000053# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000054# All files are sorted by likelihood of a change in the file content
55# (currently file size is used to estimate this: larger the file -> larger the
56# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000057# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000058# and so on. Numbers here is a trade-off; the more per request, the lower the
59# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
60# larger values cause longer lookups, increasing the initial latency to start
61# uploading, which is especially an issue for large files. This value is
62# optimized for the "few thousands files to look up with minimal number of large
63# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040064ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000065
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000066
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000067# A list of already compressed extension types that should not receive any
68# compression before being uploaded.
69ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040070 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
71 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000072]
73
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000074
maruel@chromium.org41601642013-09-18 19:40:46 +000075# The delay (in seconds) to wait between logging statements when retrieving
76# the required files. This is intended to let the user (or buildbot) know that
77# the program is still running.
78DELAY_BETWEEN_UPDATES_IN_SECS = 30
79
80
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050081DEFAULT_BLACKLIST = (
82 # Temporary vim or python files.
83 r'^.+\.(?:pyc|swp)$',
84 # .git or .svn directory.
85 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
86)
87
88
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050089class Error(Exception):
90 """Generic runtime error."""
91 pass
92
93
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040094class Aborted(Error):
95 """Operation aborted."""
96 pass
97
98
nodir90bc8dc2016-06-15 13:35:21 -070099class AlreadyExists(Error):
100 """File already exists."""
101
102
maruel12e30012015-10-09 11:55:35 -0700103def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800104 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700105 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800106 if offset:
107 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000108 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000109 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000110 if not data:
111 break
112 yield data
113
114
tansell9e04a8d2016-07-28 09:31:59 -0700115def fileobj_path(fileobj):
116 """Return file system path for file like object or None.
117
118 The returned path is guaranteed to exist and can be passed to file system
119 operations like copy.
120 """
121 name = getattr(fileobj, 'name', None)
122 if name is None:
123 return
124
125 # If the file like object was created using something like open("test.txt")
126 # name will end up being a str (such as a function outside our control, like
127 # the standard library). We want all our paths to be unicode objects, so we
128 # decode it.
129 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500130 # We incorrectly assume that UTF-8 is used everywhere.
131 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700132
tansell26de79e2016-11-13 18:41:11 -0800133 # fs.exists requires an absolute path, otherwise it will fail with an
134 # assertion error.
135 if not os.path.isabs(name):
136 return
137
tansell9e04a8d2016-07-28 09:31:59 -0700138 if fs.exists(name):
139 return name
140
141
142# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
143# wrappers have been created.
144def fileobj_copy(
145 dstfileobj, srcfileobj, size=-1,
146 chunk_size=isolated_format.DISK_FILE_CHUNK):
147 """Copy data from srcfileobj to dstfileobj.
148
149 Providing size means exactly that amount of data will be copied (if there
150 isn't enough data, an IOError exception is thrown). Otherwise all data until
151 the EOF marker will be copied.
152 """
153 if size == -1 and hasattr(srcfileobj, 'tell'):
154 if srcfileobj.tell() != 0:
155 raise IOError('partial file but not using size')
156
157 written = 0
158 while written != size:
159 readsize = chunk_size
160 if size > 0:
161 readsize = min(readsize, size-written)
162 data = srcfileobj.read(readsize)
163 if not data:
164 if size == -1:
165 break
166 raise IOError('partial file, got %s, wanted %s' % (written, size))
167 dstfileobj.write(data)
168 written += len(data)
169
170
171def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
172 """Put srcfileobj at the given dstpath with given mode.
173
174 The function aims to do this as efficiently as possible while still allowing
175 any possible file like object be given.
176
177 Creating a tree of hardlinks has a few drawbacks:
178 - tmpfs cannot be used for the scratch space. The tree has to be on the same
179 partition as the cache.
180 - involves a write to the inode, which advances ctime, cause a metadata
181 writeback (causing disk seeking).
182 - cache ctime cannot be used to detect modifications / corruption.
183 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
184 partition. This is why the function automatically fallbacks to copying the
185 file content.
186 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
187 same owner is for all hardlinks.
188 - Anecdotal report that ext2 is known to be potentially faulty on high rate
189 of hardlink creation.
190
191 Creating a tree of symlinks has a few drawbacks:
192 - Tasks running the equivalent of os.path.realpath() will get the naked path
193 and may fail.
194 - Windows:
195 - Symlinks are reparse points:
196 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
197 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
198 - Symbolic links are Win32 paths, not NT paths.
199 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
200 - Symbolic links are supported on Windows 7 and later only.
201 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
202 default.
203 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
204 RID is present in the token;
205 https://msdn.microsoft.com/en-us/library/bb530410.aspx
206 """
207 srcpath = fileobj_path(srcfileobj)
208 if srcpath and size == -1:
209 readonly = file_mode is None or (
210 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
211
212 if readonly:
213 # If the file is read only we can link the file
214 if use_symlink:
215 link_mode = file_path.SYMLINK_WITH_FALLBACK
216 else:
217 link_mode = file_path.HARDLINK_WITH_FALLBACK
218 else:
219 # If not read only, we must copy the file
220 link_mode = file_path.COPY
221
222 file_path.link_file(dstpath, srcpath, link_mode)
223 else:
224 # Need to write out the file
225 with fs.open(dstpath, 'wb') as dstfileobj:
226 fileobj_copy(dstfileobj, srcfileobj, size)
227
228 assert fs.exists(dstpath)
229
230 # file_mode of 0 is actually valid, so need explicit check.
231 if file_mode is not None:
232 fs.chmod(dstpath, file_mode)
233
234
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000235def zip_compress(content_generator, level=7):
236 """Reads chunks from |content_generator| and yields zip compressed chunks."""
237 compressor = zlib.compressobj(level)
238 for chunk in content_generator:
239 compressed = compressor.compress(chunk)
240 if compressed:
241 yield compressed
242 tail = compressor.flush(zlib.Z_FINISH)
243 if tail:
244 yield tail
245
246
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400247def zip_decompress(
248 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000249 """Reads zipped data from |content_generator| and yields decompressed data.
250
251 Decompresses data in small chunks (no larger than |chunk_size|) so that
252 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
253
254 Raises IOError if data is corrupted or incomplete.
255 """
256 decompressor = zlib.decompressobj()
257 compressed_size = 0
258 try:
259 for chunk in content_generator:
260 compressed_size += len(chunk)
261 data = decompressor.decompress(chunk, chunk_size)
262 if data:
263 yield data
264 while decompressor.unconsumed_tail:
265 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
266 if data:
267 yield data
268 tail = decompressor.flush()
269 if tail:
270 yield tail
271 except zlib.error as e:
272 raise IOError(
273 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
274 # Ensure all data was read and decompressed.
275 if decompressor.unused_data or decompressor.unconsumed_tail:
276 raise IOError('Not all data was decompressed')
277
278
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000279def get_zip_compression_level(filename):
280 """Given a filename calculates the ideal zip compression level to use."""
281 file_ext = os.path.splitext(filename)[1].lower()
282 # TODO(csharp): Profile to find what compression level works best.
283 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
284
285
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000286def create_directories(base_directory, files):
287 """Creates the directory structure needed by the given list of files."""
288 logging.debug('create_directories(%s, %d)', base_directory, len(files))
289 # Creates the tree of directories to create.
290 directories = set(os.path.dirname(f) for f in files)
291 for item in list(directories):
292 while item:
293 directories.add(item)
294 item = os.path.dirname(item)
295 for d in sorted(directories):
296 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700297 abs_d = os.path.join(base_directory, d)
298 if not fs.isdir(abs_d):
299 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000300
301
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500302def create_symlinks(base_directory, files):
303 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000304 for filepath, properties in files:
305 if 'l' not in properties:
306 continue
307 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500308 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000309 logging.warning('Ignoring symlink %s', filepath)
310 continue
311 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700312 try:
313 os.symlink(properties['l'], outfile) # pylint: disable=E1101
314 except OSError as e:
315 if e.errno == errno.EEXIST:
316 raise AlreadyExists('File %s already exists.' % outfile)
317 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000318
319
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400320class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800321 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000322
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800323 Its digest and size may be provided in advance, if known. Otherwise they will
324 be derived from the file content.
325 """
326
327 def __init__(self, path, digest=None, size=None, high_priority=False):
328 super(FileItem, self).__init__(
329 digest,
maruel12e30012015-10-09 11:55:35 -0700330 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800331 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000332 self.path = path
333 self.compression_level = get_zip_compression_level(path)
334
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800335 def content(self):
336 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000337
338
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400339class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000340 """A byte buffer to push to Storage."""
341
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800342 def __init__(self, buf, high_priority=False):
343 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000344 self.buffer = buf
345
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800346 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000347 return [self.buffer]
348
349
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000350class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800351 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000352
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800353 Implements compression support, parallel 'contains' checks, parallel uploads
354 and more.
355
356 Works only within single namespace (and thus hashing algorithm and compression
357 scheme are fixed).
358
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400359 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
360 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800361 """
362
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700363 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000364 self._storage_api = storage_api
365 self._cpu_thread_pool = None
366 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400367 self._aborted = False
368 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000369
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000370 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000371 def server_ref(self):
372 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700373
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000374 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700375 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000376 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700377
378 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000379 def cpu_thread_pool(self):
380 """ThreadPool for CPU-bound tasks like zipping."""
381 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500382 threads = max(threading_utils.num_processors(), 2)
383 if sys.maxsize <= 2L**32:
384 # On 32 bits userland, do not try to use more than 16 threads.
385 threads = min(threads, 16)
386 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000387 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000388
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000389 @property
390 def net_thread_pool(self):
391 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
392 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700393 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000394 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000395
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000396 def close(self):
397 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400398 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000399 if self._cpu_thread_pool:
400 self._cpu_thread_pool.join()
401 self._cpu_thread_pool.close()
402 self._cpu_thread_pool = None
403 if self._net_thread_pool:
404 self._net_thread_pool.join()
405 self._net_thread_pool.close()
406 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400407 logging.info('Done.')
408
409 def abort(self):
410 """Cancels any pending or future operations."""
411 # This is not strictly theadsafe, but in the worst case the logging message
412 # will be printed twice. Not a big deal. In other places it is assumed that
413 # unprotected reads and writes to _aborted are serializable (it is true
414 # for python) and thus no locking is used.
415 if not self._aborted:
416 logging.warning('Aborting... It can take a while.')
417 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000418
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000419 def __enter__(self):
420 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400421 assert not self._prev_sig_handlers, self._prev_sig_handlers
422 for s in (signal.SIGINT, signal.SIGTERM):
423 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000424 return self
425
426 def __exit__(self, _exc_type, _exc_value, _traceback):
427 """Context manager interface."""
428 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400429 while self._prev_sig_handlers:
430 s, h = self._prev_sig_handlers.popitem()
431 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000432 return False
433
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000434 def upload_items(self, items):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800435 """Uploads a bunch of items to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000436
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800437 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000438
439 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400440 items: list of isolate_storage.Item instances that represents data to
441 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000442
443 Returns:
444 List of items that were uploaded. All other items are already there.
445 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700446 logging.info('upload_items(items=%d)', len(items))
447
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800448 # Ensure all digests are calculated.
449 for item in items:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000450 item.prepare(self.server_ref.hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800451
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400452 # For each digest keep only first isolate_storage.Item that matches it. All
453 # other items are just indistinguishable copies from the point of view of
454 # isolate server (it doesn't care about paths at all, only content and
455 # digests).
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000456 seen = {}
457 duplicates = 0
458 for item in items:
459 if seen.setdefault(item.digest, item) is not item:
460 duplicates += 1
461 items = seen.values()
462 if duplicates:
Vadim Shtayuraea38c572014-10-06 16:57:16 -0700463 logging.info('Skipped %d files with duplicated content', duplicates)
vadimsh@chromium.org672cd2b2013-10-08 17:49:33 +0000464
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000465 # Enqueue all upload tasks.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000466 missing = set()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000467 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800468 channel = threading_utils.TaskChannel()
469 for missing_item, push_state in self.get_missing_items(items):
470 missing.add(missing_item)
471 self.async_push(channel, missing_item, push_state)
472
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000473 # No need to spawn deadlock detector thread if there's nothing to upload.
474 if missing:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700475 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000476 # Wait for all started uploads to finish.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000477 while len(uploaded) != len(missing):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000478 detector.ping()
479 item = channel.pull()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000480 uploaded.append(item)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000481 logging.debug(
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000482 'Uploaded %d / %d: %s', len(uploaded), len(missing), item.digest)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000483 logging.info('All files are uploaded')
484
485 # Print stats.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000486 total = len(items)
487 total_size = sum(f.size for f in items)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000488 logging.info(
489 'Total: %6d, %9.1fkb',
490 total,
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000491 total_size / 1024.)
492 cache_hit = set(items) - missing
493 cache_hit_size = sum(f.size for f in cache_hit)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000494 logging.info(
495 'cache hit: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
496 len(cache_hit),
497 cache_hit_size / 1024.,
498 len(cache_hit) * 100. / total,
499 cache_hit_size * 100. / total_size if total_size else 0)
500 cache_miss = missing
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000501 cache_miss_size = sum(f.size for f in cache_miss)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000502 logging.info(
503 'cache miss: %6d, %9.1fkb, %6.2f%% files, %6.2f%% size',
504 len(cache_miss),
505 cache_miss_size / 1024.,
506 len(cache_miss) * 100. / total,
507 cache_miss_size * 100. / total_size if total_size else 0)
508
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000509 return uploaded
510
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800511 def async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000512 """Starts asynchronous push to the server in a parallel thread.
513
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800514 Can be used only after |item| was checked for presence on a server with
515 'get_missing_items' call. 'get_missing_items' returns |push_state| object
516 that contains storage specific information describing how to upload
517 the item (for example in case of cloud storage, it is signed upload URLs).
518
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000519 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000520 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400521 item: item to upload as instance of isolate_storage.Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800522 push_state: push state returned by 'get_missing_items' call for |item|.
523
524 Returns:
525 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000526 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800527 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400528 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700529 threading_utils.PRIORITY_HIGH if item.high_priority
530 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800531
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000532 def push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400533 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400534 if self._aborted:
535 raise Aborted()
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000536 item.prepare(self.server_ref.hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800537 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000538 return item
539
Wei Huang1a38fbe2017-11-28 22:55:22 -0500540 # If zipping is not required, just start a push task. Don't pass 'content'
541 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000542 if not self.server_ref.is_with_compression:
Wei Huang1a38fbe2017-11-28 22:55:22 -0500543 self.net_thread_pool.add_task_with_channel(channel, priority, push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000544 return
545
546 # If zipping is enabled, zip in a separate thread.
547 def zip_and_push():
548 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
549 # content right here. It will block until all file is zipped.
550 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400551 if self._aborted:
552 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800553 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000554 data = ''.join(stream)
555 except Exception as exc:
556 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800557 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000558 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500559 # Pass '[data]' explicitly because the compressed data is not same as the
560 # one provided by 'item'. Since '[data]' is a list, it can safely be
561 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000562 self.net_thread_pool.add_task_with_channel(
563 channel, priority, push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000564 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000565
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800566 def push(self, item, push_state):
567 """Synchronously pushes a single item to the server.
568
569 If you need to push many items at once, consider using 'upload_items' or
570 'async_push' with instance of TaskChannel.
571
572 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400573 item: item to upload as instance of isolate_storage.Item class.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800574 push_state: push state returned by 'get_missing_items' call for |item|.
575
576 Returns:
577 Pushed item (same object as |item|).
578 """
579 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700580 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800581 self.async_push(channel, item, push_state)
582 pushed = channel.pull()
583 assert pushed is item
584 return item
585
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000586 def async_fetch(self, channel, priority, digest, size, sink):
587 """Starts asynchronous fetch from the server in a parallel thread.
588
589 Arguments:
590 channel: TaskChannel that receives back |digest| when download ends.
591 priority: thread pool task priority for the fetch.
592 digest: hex digest of an item to download.
593 size: expected size of the item (after decompression).
594 sink: function that will be called as sink(generator).
595 """
596 def fetch():
597 try:
598 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400599 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000600 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400601 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000602 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000603 verifier = FetchStreamVerifier(
604 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000605 # Verified stream goes to |sink|.
606 sink(verifier.run())
607 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800608 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000609 raise
610 return digest
611
612 # Don't bother with zip_thread_pool for decompression. Decompression is
613 # really fast and most probably IO bound anyway.
614 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
615
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000616 def get_missing_items(self, items):
617 """Yields items that are missing from the server.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000618
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000619 Issues multiple parallel queries via StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000620
621 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400622 items: a list of isolate_storage.Item objects to check.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000623
624 Yields:
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800625 For each missing item it yields a pair (item, push_state), where:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400626 * item - isolate_storage.Item object that is missing (one of |items|).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800627 * push_state - opaque object that contains storage specific information
628 describing how to upload the item (for example in case of cloud
629 storage, it is signed upload URLs). It can later be passed to
630 'async_push'.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000631 """
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000632 channel = threading_utils.TaskChannel()
633 pending = 0
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800634
635 # Ensure all digests are calculated.
636 for item in items:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000637 item.prepare(self.server_ref.hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800638
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400639 def contains(batch):
640 if self._aborted:
641 raise Aborted()
642 return self._storage_api.contains(batch)
643
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000644 # Enqueue all requests.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645 for batch in batch_items_for_check(items):
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400646 self.net_thread_pool.add_task_with_channel(
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400647 channel, threading_utils.PRIORITY_HIGH, contains, batch)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000648 pending += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800649
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000650 # Yield results as they come in.
651 for _ in xrange(pending):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800652 for missing_item, push_state in channel.pull().iteritems():
653 yield missing_item, push_state
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000654
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000655
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800656def batch_items_for_check(items):
657 """Splits list of items to check for existence on the server into batches.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000658
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800659 Each batch corresponds to a single 'exists?' query to the server via a call
660 to StorageApi's 'contains' method.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000661
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800662 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400663 items: a list of isolate_storage.Item objects.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800664
665 Yields:
666 Batches of items to query for existence in a single operation,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400667 each batch is a list of isolate_storage.Item objects.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800668 """
669 batch_count = 0
670 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[0]
671 next_queries = []
672 for item in sorted(items, key=lambda x: x.size, reverse=True):
673 next_queries.append(item)
674 if len(next_queries) == batch_size_limit:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000675 yield next_queries
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800676 next_queries = []
677 batch_count += 1
678 batch_size_limit = ITEMS_PER_CONTAINS_QUERIES[
679 min(batch_count, len(ITEMS_PER_CONTAINS_QUERIES) - 1)]
680 if next_queries:
681 yield next_queries
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000682
683
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000684class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400685 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000686
687 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400688 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
689 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000690 """
691
692 def __init__(self, storage, cache):
693 self.storage = storage
694 self.cache = cache
695 self._channel = threading_utils.TaskChannel()
696 self._pending = set()
697 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000698 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400699 # Pending digests that the caller waits for, see wait_on()/wait().
700 self._waiting_on = set()
701 # Already fetched digests the caller waits for which are not yet returned by
702 # wait().
703 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000704
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400705 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700706 self,
707 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400708 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700709 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000710 """Starts asynchronous fetch of item |digest|."""
711 # Fetching it now?
712 if digest in self._pending:
713 return
714
715 # Mark this file as in use, verify_all_cached will later ensure it is still
716 # in cache.
717 self._accessed.add(digest)
718
719 # Already fetched? Notify cache to update item's LRU position.
720 if digest in self._fetched:
721 # 'touch' returns True if item is in cache and not corrupted.
722 if self.cache.touch(digest, size):
723 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000724 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000725 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000726
727 # TODO(maruel): It should look at the free disk space, the current cache
728 # size and the size of the new item on every new item:
729 # - Trim the cache as more entries are listed when free disk space is low,
730 # otherwise if the amount of data downloaded during the run > free disk
731 # space, it'll crash.
732 # - Make sure there's enough free disk space to fit all dependencies of
733 # this run! If not, abort early.
734
735 # Start fetching.
736 self._pending.add(digest)
737 self.storage.async_fetch(
738 self._channel, priority, digest, size,
739 functools.partial(self.cache.write, digest))
740
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400741 def wait_on(self, digest):
742 """Updates digests to be waited on by 'wait'."""
743 # Calculate once the already fetched items. These will be retrieved first.
744 if digest in self._fetched:
745 self._waiting_on_ready.add(digest)
746 else:
747 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000748
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400749 def wait(self):
750 """Waits until any of waited-on items is retrieved.
751
752 Once this happens, it is remove from the waited-on set and returned.
753
754 This function is called in two waves. The first wave it is done for HIGH
755 priority items, the isolated files themselves. The second wave it is called
756 for all the files.
757
758 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000759 """
760 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400761 if self._waiting_on_ready:
762 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000763
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400764 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000765
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400766 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000767 while self._pending:
768 digest = self._channel.pull()
769 self._pending.remove(digest)
770 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400771 if digest in self._waiting_on:
772 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000773 return digest
774
775 # Should never reach this point due to assert above.
776 raise RuntimeError('Impossible state')
777
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400778 @property
779 def wait_queue_empty(self):
780 """Returns True if there is no digest left for wait() to return."""
781 return not self._waiting_on and not self._waiting_on_ready
782
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000783 def inject_local_file(self, path, algo):
784 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700785 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000786 data = f.read()
787 digest = algo(data).hexdigest()
788 self.cache.write(digest, [data])
789 self._fetched.add(digest)
790 return digest
791
792 @property
793 def pending_count(self):
794 """Returns number of items to be fetched."""
795 return len(self._pending)
796
797 def verify_all_cached(self):
798 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000799 # Not thread safe, but called after all work is done.
800 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000801
802
803class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400804 """Verifies that fetched file is valid before passing it to the
805 ContentAddressedCache.
806 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000807
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400808 def __init__(self, stream, hasher, expected_digest, expected_size):
809 """Initializes the verifier.
810
811 Arguments:
812 * stream: an iterable yielding chunks of content
813 * hasher: an object from hashlib that supports update() and hexdigest()
814 (eg, hashlib.sha1).
815 * expected_digest: if the entire stream is piped through hasher and then
816 summarized via hexdigest(), this should be the result. That is, it
817 should be a hex string like 'abc123'.
818 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400819 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400820 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400821 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000822 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400823 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000824 self.expected_size = expected_size
825 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400826 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000827
828 def run(self):
829 """Generator that yields same items as |stream|.
830
831 Verifies |stream| is complete before yielding a last chunk to consumer.
832
833 Also wraps IOError produced by consumer into MappingError exceptions since
834 otherwise Storage will retry fetch on unrelated local cache errors.
835 """
836 # Read one chunk ahead, keep it in |stored|.
837 # That way a complete stream can be verified before pushing last chunk
838 # to consumer.
839 stored = None
840 for chunk in self.stream:
841 assert chunk is not None
842 if stored is not None:
843 self._inspect_chunk(stored, is_last=False)
844 try:
845 yield stored
846 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400847 raise isolated_format.MappingError(
848 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000849 stored = chunk
850 if stored is not None:
851 self._inspect_chunk(stored, is_last=True)
852 try:
853 yield stored
854 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400855 raise isolated_format.MappingError(
856 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000857
858 def _inspect_chunk(self, chunk, is_last):
859 """Called for each fetched chunk before passing it to consumer."""
860 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400861 self.rolling_hash.update(chunk)
862 if not is_last:
863 return
864
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400865 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000866 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400867 msg = 'Incorrect file size: want %d, got %d' % (
868 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400869 raise IOError(msg)
870
871 actual_digest = self.rolling_hash.hexdigest()
872 if self.expected_digest != actual_digest:
873 msg = 'Incorrect digest: want %s, got %s' % (
874 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400875 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000876
877
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700878class IsolatedBundle(object):
879 """Fetched and parsed .isolated file with all dependencies."""
880
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000881 def __init__(self, filter_cb):
882 """
883 filter_cb: callback function to filter downloaded content.
884 When filter_cb is not None, Isolated file is downloaded iff
885 filter_cb(filepath) returns True.
886 """
887
Vadim Shtayura3148e072014-09-02 18:51:52 -0700888 self.command = []
889 self.files = {}
890 self.read_only = None
891 self.relative_cwd = None
892 # The main .isolated file, a IsolatedFile instance.
893 self.root = None
894
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000895 self._filter_cb = filter_cb
896
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700897 def fetch(self, fetch_queue, root_isolated_hash, algo):
898 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700899
900 It enables support for "included" .isolated files. They are processed in
901 strict order but fetched asynchronously from the cache. This is important so
902 that a file in an included .isolated file that is overridden by an embedding
903 .isolated file is not fetched needlessly. The includes are fetched in one
904 pass and the files are fetched as soon as all the ones on the left-side
905 of the tree were fetched.
906
907 The prioritization is very important here for nested .isolated files.
908 'includes' have the highest priority and the algorithm is optimized for both
909 deep and wide trees. A deep one is a long link of .isolated files referenced
910 one at a time by one item in 'includes'. A wide one has a large number of
911 'includes' in a single .isolated file. 'left' is defined as an included
912 .isolated file earlier in the 'includes' list. So the order of the elements
913 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700914
915 As a side effect this method starts asynchronous fetch of all data files
916 by adding them to |fetch_queue|. It doesn't wait for data files to finish
917 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700918 """
919 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
920
921 # Isolated files being retrieved now: hash -> IsolatedFile instance.
922 pending = {}
923 # Set of hashes of already retrieved items to refuse recursive includes.
924 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700925 # Set of IsolatedFile's whose data files have already being fetched.
926 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700927
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700928 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400929 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -0700930 h = isolated_file.obj_hash
931 if h in seen:
932 raise isolated_format.IsolatedError(
933 'IsolatedFile %s is retrieved recursively' % h)
934 assert h not in pending
935 seen.add(h)
936 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400937 # This isolated item is being added dynamically, notify FetchQueue.
938 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700939 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
940
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700941 # Start fetching root *.isolated file (single file, not the whole bundle).
942 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700943
944 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700945 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400946 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700947 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -0700948 with fetch_queue.cache.getfileobj(item_hash) as f:
949 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -0700950
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700951 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700952 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700953 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700954
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700955 # Always fetch *.isolated files in traversal order, waiting if necessary
956 # until next to-be-processed node loads. "Waiting" is done by yielding
957 # back to the outer loop, that waits until some *.isolated is loaded.
958 for node in isolated_format.walk_includes(self.root):
959 if node not in processed:
960 # Not visited, and not yet loaded -> wait for it to load.
961 if not node.is_loaded:
962 break
963 # Not visited and loaded -> process it and continue the traversal.
964 self._start_fetching_files(node, fetch_queue)
965 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700966
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700967 # All *.isolated files should be processed by now and only them.
968 all_isolateds = set(isolated_format.walk_includes(self.root))
969 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400970 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -0700971
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700972 # Extract 'command' and other bundle properties.
973 for node in isolated_format.walk_includes(self.root):
974 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700975 self.relative_cwd = self.relative_cwd or ''
976
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700977 def _start_fetching_files(self, isolated, fetch_queue):
978 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700979
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700980 Modifies self.files.
981 """
maruel10bea7b2016-12-07 05:03:49 -0800982 files = isolated.data.get('files', {})
983 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
984 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000985 if self._filter_cb and not self._filter_cb(filepath):
986 continue
987
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700988 # Root isolated has priority on the files being mapped. In particular,
989 # overridden files must not be fetched.
990 if filepath not in self.files:
991 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -0700992
993 # Make sure if the isolated is read only, the mode doesn't have write
994 # bits.
995 if 'm' in properties and self.read_only:
996 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
997
998 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700999 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001000 fetch_queue.add(
1001 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1002
1003 def _update_self(self, node):
1004 """Extracts bundle global parameters from loaded *.isolated file.
1005
1006 Will be called with each loaded *.isolated file in order of traversal of
1007 isolated include graph (see isolated_format.walk_includes).
1008 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001009 # Grabs properties.
1010 if not self.command and node.data.get('command'):
1011 # Ensure paths are correctly separated on windows.
1012 self.command = node.data['command']
1013 if self.command:
1014 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001015 if self.read_only is None and node.data.get('read_only') is not None:
1016 self.read_only = node.data['read_only']
1017 if (self.relative_cwd is None and
1018 node.data.get('relative_cwd') is not None):
1019 self.relative_cwd = node.data['relative_cwd']
1020
1021
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001022def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001023 """Returns Storage class that can upload and download from |namespace|.
1024
1025 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001026 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001027
1028 Returns:
1029 Instance of Storage.
1030 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001031 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1032 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001033
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001034
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001035def upload_tree(server_ref, infiles):
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001036 """Uploads the given tree to the given url.
1037
1038 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001039 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001040 infiles: iterable of pairs (absolute path, metadata dict) of files.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001041 """
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001042 # Convert |infiles| into a list of FileItem objects, skip duplicates.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001043 # Filter out symlinks, since they are not represented by items on isolate
1044 # server side.
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001045 items = []
1046 seen = set()
1047 skipped = 0
1048 for filepath, metadata in infiles:
1049 if 'l' not in metadata and filepath not in seen:
1050 seen.add(filepath)
1051 item = FileItem(
1052 path=filepath,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001053 digest=metadata['h'],
1054 size=metadata['s'],
1055 high_priority=metadata.get('priority') == '0')
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001056 items.append(item)
1057 else:
1058 skipped += 1
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001059
Vadim Shtayuraea38c572014-10-06 16:57:16 -07001060 logging.info('Skipped %d duplicated entries', skipped)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001061 with get_storage(server_ref) as storage:
maruel064c0a32016-04-05 11:47:15 -07001062 return storage.upload_items(items)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001063
1064
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001065def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1066 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001067 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001068
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001069 Arguments:
1070 isolated_hash: hash of the root *.isolated file.
1071 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001072 cache: ContentAddressedCache class that knows how to store and map files
1073 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001074 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001075 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001076 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001077
1078 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001079 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001080 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001081 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001082 'fetch_isolated(%s, %s, %s, %s, %s)',
1083 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001084 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001085 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001086 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001087 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001088
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001089 with tools.Profiler('GetIsolateds'):
1090 # Optionally support local files by manually adding them to cache.
1091 if not isolated_format.is_valid_hash(isolated_hash, algo):
1092 logging.debug('%s is not a valid hash, assuming a file '
1093 '(algo was %s, hash size was %d)',
1094 isolated_hash, algo(), algo().digest_size)
1095 path = unicode(os.path.abspath(isolated_hash))
1096 try:
1097 isolated_hash = fetch_queue.inject_local_file(path, algo)
1098 except IOError as e:
1099 raise isolated_format.MappingError(
1100 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1101 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001102
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001103 # Load all *.isolated and start loading rest of the files.
1104 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001105
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001106 with tools.Profiler('GetRest'):
1107 # Create file system hierarchy.
1108 file_path.ensure_tree(outdir)
1109 create_directories(outdir, bundle.files)
1110 create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001111
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001112 # Ensure working directory exists.
1113 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1114 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001115
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001116 # Multimap: digest -> list of pairs (path, props).
1117 remaining = {}
1118 for filepath, props in bundle.files.iteritems():
1119 if 'h' in props:
1120 remaining.setdefault(props['h'], []).append((filepath, props))
1121 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001122
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001123 # Now block on the remaining files to be downloaded and mapped.
1124 logging.info('Retrieving remaining files (%d of them)...',
1125 fetch_queue.pending_count)
1126 last_update = time.time()
1127 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1128 while remaining:
1129 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001130
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001131 # Wait for any item to finish fetching to cache.
1132 digest = fetch_queue.wait()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001133
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001134 # Create the files in the destination using item in cache as the
1135 # source.
1136 for filepath, props in remaining.pop(digest):
1137 fullpath = os.path.join(outdir, filepath)
tansell9e04a8d2016-07-28 09:31:59 -07001138
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001139 with cache.getfileobj(digest) as srcfileobj:
1140 filetype = props.get('t', 'basic')
tanselle4288c32016-07-28 09:45:40 -07001141
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001142 if filetype == 'basic':
1143 # Ignore all bits apart from the user.
1144 file_mode = (props.get('m') or 0500) & 0700
1145 if bundle.read_only:
1146 # Enforce read-only if the root bundle does.
1147 file_mode &= 0500
1148 putfile(
1149 srcfileobj, fullpath, file_mode,
1150 use_symlink=use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001151
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001152 elif filetype == 'tar':
1153 basedir = os.path.dirname(fullpath)
1154 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1155 for ti in t:
1156 if not ti.isfile():
1157 logging.warning(
1158 'Path(%r) is nonfile (%s), skipped',
1159 ti.name, ti.type)
1160 continue
1161 # Handle files created on Windows fetched on POSIX and the
1162 # reverse.
1163 other_sep = '/' if os.path.sep == '\\' else '\\'
1164 name = ti.name.replace(other_sep, os.path.sep)
1165 fp = os.path.normpath(os.path.join(basedir, name))
1166 if not fp.startswith(basedir):
1167 logging.error(
1168 'Path(%r) is outside root directory',
1169 fp)
1170 ifd = t.extractfile(ti)
1171 file_path.ensure_tree(os.path.dirname(fp))
1172 file_mode = ti.mode & 0700
1173 if bundle.read_only:
1174 # Enforce read-only if the root bundle does.
1175 file_mode &= 0500
1176 putfile(ifd, fp, file_mode, ti.size)
tansell26de79e2016-11-13 18:41:11 -08001177
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001178 else:
1179 raise isolated_format.IsolatedError(
1180 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001181
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001182 # Report progress.
1183 duration = time.time() - last_update
1184 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1185 msg = '%d files remaining...' % len(remaining)
1186 sys.stdout.write(msg + '\n')
1187 sys.stdout.flush()
1188 logging.info(msg)
1189 last_update = time.time()
1190 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001191
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001192 # Save the cache right away to not loose the state of the new objects.
1193 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001194 # Cache could evict some items we just tried to fetch, it's a fatal error.
1195 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001196 free_disk = file_path.get_free_space(cache.cache_dir)
1197 msg = (
1198 'Cache is too small to hold all requested files.\n'
1199 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001200 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001201 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001202 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001203
1204
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001205def directory_to_metadata(root, algo, blacklist):
1206 """Returns the FileItem list and .isolated metadata for a directory."""
1207 root = file_path.get_native_path_case(root)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001208 paths = isolated_format.expand_directory_and_symlink(
Marc-Antoine Ruel7a68f712017-12-01 18:45:18 -05001209 root, u'.' + os.path.sep, blacklist, sys.platform != 'win32')
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001210 metadata = {
1211 relpath: isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001212 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruel92257792014-08-28 20:51:08 -04001213 for relpath in paths
1214 }
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001215 for v in metadata.itervalues():
1216 v.pop('t')
1217 items = [
1218 FileItem(
1219 path=os.path.join(root, relpath),
1220 digest=meta['h'],
1221 size=meta['s'],
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001222 high_priority=relpath.endswith('.isolated'))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001223 for relpath, meta in metadata.iteritems() if 'h' in meta
1224 ]
1225 return items, metadata
1226
1227
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001228def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001229 """Stores every entries and returns the relevant data.
1230
1231 Arguments:
1232 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001233 files: list of file paths to upload. If a directory is specified, a
1234 .isolated file is created and its hash is returned.
1235 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001236
1237 Returns:
1238 tuple(list(tuple(hash, path)), list(FileItem cold), list(FileItem hot)).
1239 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001240 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001241 assert all(isinstance(i, unicode) for i in files), files
1242 if len(files) != len(set(map(os.path.abspath, files))):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001243 raise AlreadyExists('Duplicate entries found.')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001244
maruel064c0a32016-04-05 11:47:15 -07001245 # List of tuple(hash, path).
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001246 results = []
1247 # The temporary directory is only created as needed.
1248 tempdir = None
1249 try:
1250 # TODO(maruel): Yield the files to a worker thread.
1251 items_to_upload = []
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001252 hash_algo = storage.server_ref.hash_algo
1253 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001254 for f in files:
1255 try:
1256 filepath = os.path.abspath(f)
maruel12e30012015-10-09 11:55:35 -07001257 if fs.isdir(filepath):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001258 # Uploading a whole directory.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001259 items, metadata = directory_to_metadata(
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001260 filepath, hash_algo, blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001261
1262 # Create the .isolated file.
1263 if not tempdir:
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001264 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001265 handle, isolated = tempfile.mkstemp(
1266 dir=tempdir, suffix=u'.isolated')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001267 os.close(handle)
1268 data = {
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001269 'algo': hash_algo_name,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001270 'files': metadata,
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -04001271 'version': isolated_format.ISOLATED_FILE_VERSION,
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001272 }
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04001273 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001274 h = isolated_format.hash_file(isolated, hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001275 items_to_upload.extend(items)
1276 items_to_upload.append(
1277 FileItem(
1278 path=isolated,
1279 digest=h,
maruel12e30012015-10-09 11:55:35 -07001280 size=fs.stat(isolated).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001281 high_priority=True))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001282 results.append((h, f))
1283
maruel12e30012015-10-09 11:55:35 -07001284 elif fs.isfile(filepath):
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001285 h = isolated_format.hash_file(filepath, hash_algo)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001286 items_to_upload.append(
1287 FileItem(
1288 path=filepath,
1289 digest=h,
maruel12e30012015-10-09 11:55:35 -07001290 size=fs.stat(filepath).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001291 high_priority=f.endswith('.isolated')))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001292 results.append((h, f))
1293 else:
1294 raise Error('%s is neither a file or directory.' % f)
1295 except OSError:
1296 raise Error('Failed to process %s.' % f)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001297
1298 # Essentially upload_tree().
maruel064c0a32016-04-05 11:47:15 -07001299 uploaded = storage.upload_items(items_to_upload)
1300 cold = [i for i in items_to_upload if i in uploaded]
1301 hot = [i for i in items_to_upload if i not in uploaded]
1302 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001303 finally:
maruel12e30012015-10-09 11:55:35 -07001304 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001305 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001306
1307
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001308def archive(server_ref, files, blacklist):
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001309 if files == ['-']:
1310 files = sys.stdin.readlines()
1311
1312 if not files:
1313 raise Error('Nothing to upload')
1314
1315 files = [f.decode('utf-8') for f in files]
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001316 blacklist = tools.gen_blacklist(blacklist)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001317 with get_storage(server_ref) as storage:
1318 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel488ce8f2014-02-09 11:25:04 -05001319 print('\n'.join('%s %s' % (r[0], r[1]) for r in results))
1320
1321
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001322@subcommand.usage('<file1..fileN> or - to read from stdin')
1323def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001324 """Archives data to the server.
1325
1326 If a directory is specified, a .isolated file is created the whole directory
1327 is uploaded. Then this .isolated file can be included in another one to run
1328 commands.
1329
1330 The commands output each file that was processed with its content hash. For
1331 directories, the .isolated generated for the directory is listed as the
1332 directory entry itself.
1333 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001334 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001335 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001336 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001337 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001338 server_ref = isolate_storage.ServerRef(
1339 options.isolate_server, options.namespace)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001340 try:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001341 archive(server_ref, files, options.blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001342 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001343 parser.error(e.args[0])
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001344 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001345
1346
1347def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001348 """Download data from the server.
1349
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001350 It can either download individual files or a complete tree from a .isolated
1351 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001352 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001353 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001354 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001355 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001356 help='hash of an isolated file, .isolated file content is discarded, use '
1357 '--file if you need it')
1358 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001359 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1360 help='hash and destination of a file, can be used multiple times')
1361 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001362 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001363 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001364 parser.add_option(
1365 '--use-symlinks', action='store_true',
1366 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001367 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001368 options, args = parser.parse_args(args)
1369 if args:
1370 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001371 if not file_path.enable_symlink():
1372 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001373
nodir55be77b2016-05-03 09:39:57 -07001374 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001375 if bool(options.isolated) == bool(options.file):
1376 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001377 if not options.cache and options.use_symlinks:
1378 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001379
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001380 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001381 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001382 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001383 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001384 if (fs.isfile(options.target) or
1385 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001386 parser.error(
1387 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001388 server_ref = isolate_storage.ServerRef(
1389 options.isolate_server, options.namespace)
1390 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001391 # Fetching individual files.
1392 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001393 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001394 channel = threading_utils.TaskChannel()
1395 pending = {}
1396 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001397 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001398 pending[digest] = dest
1399 storage.async_fetch(
1400 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001401 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001402 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001403 local_caching.UNKNOWN_FILE_SIZE,
1404 functools.partial(
1405 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001406 while pending:
1407 fetched = channel.pull()
1408 dest = pending.pop(fetched)
1409 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001410
Vadim Shtayura3172be52013-12-03 12:49:05 -08001411 # Fetching whole isolated tree.
1412 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001413 bundle = fetch_isolated(
1414 isolated_hash=options.isolated,
1415 storage=storage,
1416 cache=cache,
1417 outdir=options.target,
1418 use_symlinks=options.use_symlinks)
1419 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001420 if bundle.command:
1421 rel = os.path.join(options.target, bundle.relative_cwd)
1422 print('To run this test please run from the directory %s:' %
1423 os.path.join(options.target, rel))
1424 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001425
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001426 return 0
1427
1428
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001429def add_archive_options(parser):
1430 parser.add_option(
1431 '--blacklist',
1432 action='append', default=list(DEFAULT_BLACKLIST),
1433 help='List of regexp to use as blacklist filter when uploading '
1434 'directories')
1435
1436
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001437def add_isolate_server_options(parser):
1438 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001439 parser.add_option(
1440 '-I', '--isolate-server',
1441 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001442 help='URL of the Isolate Server to use. Defaults to the environment '
1443 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1444 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001445 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001446 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001447 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001448 '--namespace', default='default-gzip',
1449 help='The namespace to use on the Isolate Server, default: %default')
1450
1451
nodir55be77b2016-05-03 09:39:57 -07001452def process_isolate_server_options(
1453 parser, options, set_exception_handler, required):
1454 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001455
1456 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001457 """
1458 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001459 if required:
1460 parser.error('--isolate-server is required.')
1461 return
1462
aludwind7b7b7e2017-06-29 16:38:50 -07001463 if options.grpc_proxy:
1464 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001465 else:
1466 try:
1467 options.isolate_server = net.fix_url(options.isolate_server)
1468 except ValueError as e:
1469 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001470 if set_exception_handler:
1471 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001472 try:
1473 return auth.ensure_logged_in(options.isolate_server)
1474 except ValueError as e:
1475 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001476
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001477
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001478def add_cache_options(parser):
1479 cache_group = optparse.OptionGroup(parser, 'Cache management')
1480 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001481 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001482 help='Directory to keep a local cache of the files. Accelerates download '
1483 'by reusing already downloaded files. Default=%default')
1484 cache_group.add_option(
1485 '--max-cache-size',
1486 type='int',
1487 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001488 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001489 help='Trim if the cache gets larger than this value, default=%default')
1490 cache_group.add_option(
1491 '--min-free-space',
1492 type='int',
1493 metavar='NNN',
1494 default=2*1024*1024*1024,
1495 help='Trim if disk free space becomes lower than this value, '
1496 'default=%default')
1497 cache_group.add_option(
1498 '--max-items',
1499 type='int',
1500 metavar='NNN',
1501 default=100000,
1502 help='Trim if more than this number of items are in the cache '
1503 'default=%default')
1504 parser.add_option_group(cache_group)
1505
1506
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001507def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001508 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001509 policies = local_caching.CachePolicies(
1510 options.max_cache_size,
1511 options.min_free_space,
1512 options.max_items,
1513 # 3 weeks.
1514 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001515
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001516 # |options.cache| path may not exist until DiskContentAddressedCache()
1517 # instance is created.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001518 server_ref = isolate_storage.ServerRef(
1519 options.isolate_server, options.namespace)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001520 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001521 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001522 policies,
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001523 server_ref.hash_algo(), # pylint: disable=not-callable
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001524 trim,
maruele6fc9382017-05-04 09:03:48 -07001525 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001526 else:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001527 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001528
1529
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001530class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001531 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001532 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001533 self,
1534 version=__version__,
1535 prog=os.path.basename(sys.modules[__name__].__file__),
1536 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001537 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001538
1539 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001540 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001541 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001542 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001543 return options, args
1544
1545
1546def main(args):
1547 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001548 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001549
1550
1551if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001552 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001553 fix_encoding.fix_encoding()
1554 tools.disable_buffering()
1555 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001556 sys.exit(main(sys.argv[1:]))