blob: 30719fa988bc7202f485649c1e38520829f3ba42 [file] [log] [blame]
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001#!/usr/bin/env python
maruelea586f32016-04-05 11:11:33 -07002# Copyright 2013 The LUCI Authors. All rights reserved.
maruelf1f5e2a2016-05-25 17:10:39 -07003# Use of this source code is governed under the Apache License, Version 2.0
4# that can be found in the LICENSE file.
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00005
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -04006"""Archives a set of files or directories to an Isolate Server."""
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00007
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00008__version__ = '0.9.0'
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00009
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +000010import collections
nodir90bc8dc2016-06-15 13:35:21 -070011import errno
tansell9e04a8d2016-07-28 09:31:59 -070012import functools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000013import logging
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -040014import optparse
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000015import os
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000016import Queue
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +000017import re
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040018import signal
tansell9e04a8d2016-07-28 09:31:59 -070019import stat
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000020import sys
tansell26de79e2016-11-13 18:41:11 -080021import tarfile
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050022import tempfile
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +000023import threading
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000024import time
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000025import zlib
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000026
maruel@chromium.orgfb78d432013-08-28 21:22:40 +000027from third_party import colorama
28from third_party.depot_tools import fix_encoding
29from third_party.depot_tools import subcommand
30
Marc-Antoine Ruel37989932013-11-19 16:28:08 -050031from utils import file_path
maruel12e30012015-10-09 11:55:35 -070032from utils import fs
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -040033from utils import logging_utils
vadimsh@chromium.org6b706212013-08-28 15:03:46 +000034from utils import net
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -040035from utils import on_error
maruel8e4e40c2016-05-30 06:21:07 -070036from utils import subprocess42
vadimsh@chromium.orgb074b162013-08-22 17:55:46 +000037from utils import threading_utils
vadimsh@chromium.orga4326472013-08-24 02:05:41 +000038from utils import tools
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000039
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080040import auth
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -040041import isolated_format
aludwin81178302016-11-30 17:18:49 -080042import isolate_storage
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -040043import local_caching
Vadim Shtayurae34e13a2014-02-02 11:23:26 -080044
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000045
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000046# Version of isolate protocol passed to the server in /handshake request.
47ISOLATE_PROTOCOL_VERSION = '1.0'
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000048
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000049
Vadim Shtayura3148e072014-09-02 18:51:52 -070050# Maximum expected delay (in seconds) between successive file fetches or uploads
51# in Storage. If it takes longer than that, a deadlock might be happening
52# and all stack frames for all threads are dumped to log.
53DEADLOCK_TIMEOUT = 5 * 60
54
55
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000056# The number of files to check the isolate server per /pre-upload query.
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000057# All files are sorted by likelihood of a change in the file content
58# (currently file size is used to estimate this: larger the file -> larger the
59# possibility it has changed). Then first ITEMS_PER_CONTAINS_QUERIES[0] files
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +000060# are taken and send to '/pre-upload', then next ITEMS_PER_CONTAINS_QUERIES[1],
vadimsh@chromium.orgeea52422013-08-21 19:35:54 +000061# and so on. Numbers here is a trade-off; the more per request, the lower the
62# effect of HTTP round trip latency and TCP-level chattiness. On the other hand,
63# larger values cause longer lookups, increasing the initial latency to start
64# uploading, which is especially an issue for large files. This value is
65# optimized for the "few thousands files to look up with minimal number of large
66# files missing" case.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -040067ITEMS_PER_CONTAINS_QUERIES = (20, 20, 50, 50, 50, 100)
csharp@chromium.org07fa7592013-01-11 18:19:30 +000068
maruel@chromium.org9958e4a2013-09-17 00:01:48 +000069
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000070# A list of already compressed extension types that should not receive any
71# compression before being uploaded.
72ALREADY_COMPRESSED_TYPES = [
Marc-Antoine Ruel7f234c82014-08-06 21:55:18 -040073 '7z', 'avi', 'cur', 'gif', 'h264', 'jar', 'jpeg', 'jpg', 'mp4', 'pdf',
74 'png', 'wav', 'zip',
csharp@chromium.org59c7bcf2012-11-21 21:13:18 +000075]
76
maruel@chromium.orgc6f90062012-11-07 18:32:22 +000077
maruel@chromium.org41601642013-09-18 19:40:46 +000078# The delay (in seconds) to wait between logging statements when retrieving
79# the required files. This is intended to let the user (or buildbot) know that
80# the program is still running.
81DELAY_BETWEEN_UPDATES_IN_SECS = 30
82
83
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -050084DEFAULT_BLACKLIST = (
85 # Temporary vim or python files.
86 r'^.+\.(?:pyc|swp)$',
87 # .git or .svn directory.
88 r'^(?:.+' + re.escape(os.path.sep) + r'|)\.(?:git|svn)$',
89)
90
91
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -050092class Error(Exception):
93 """Generic runtime error."""
94 pass
95
96
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +040097class Aborted(Error):
98 """Operation aborted."""
99 pass
100
101
nodir90bc8dc2016-06-15 13:35:21 -0700102class AlreadyExists(Error):
103 """File already exists."""
104
105
maruel12e30012015-10-09 11:55:35 -0700106def file_read(path, chunk_size=isolated_format.DISK_FILE_CHUNK, offset=0):
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800107 """Yields file content in chunks of |chunk_size| starting from |offset|."""
maruel12e30012015-10-09 11:55:35 -0700108 with fs.open(path, 'rb') as f:
Vadim Shtayuraf0cb97a2013-12-05 13:57:49 -0800109 if offset:
110 f.seek(offset)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000111 while True:
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000112 data = f.read(chunk_size)
maruel@chromium.org8750e4b2013-09-18 02:37:57 +0000113 if not data:
114 break
115 yield data
116
117
tansell9e04a8d2016-07-28 09:31:59 -0700118def fileobj_path(fileobj):
119 """Return file system path for file like object or None.
120
121 The returned path is guaranteed to exist and can be passed to file system
122 operations like copy.
123 """
124 name = getattr(fileobj, 'name', None)
125 if name is None:
126 return
127
128 # If the file like object was created using something like open("test.txt")
129 # name will end up being a str (such as a function outside our control, like
130 # the standard library). We want all our paths to be unicode objects, so we
131 # decode it.
132 if not isinstance(name, unicode):
Marc-Antoine Rueld8464b12017-12-04 15:59:41 -0500133 # We incorrectly assume that UTF-8 is used everywhere.
134 name = name.decode('utf-8')
tansell9e04a8d2016-07-28 09:31:59 -0700135
tansell26de79e2016-11-13 18:41:11 -0800136 # fs.exists requires an absolute path, otherwise it will fail with an
137 # assertion error.
138 if not os.path.isabs(name):
139 return
140
tansell9e04a8d2016-07-28 09:31:59 -0700141 if fs.exists(name):
142 return name
143
144
145# TODO(tansell): Replace fileobj_copy with shutil.copyfileobj once proper file
146# wrappers have been created.
147def fileobj_copy(
148 dstfileobj, srcfileobj, size=-1,
149 chunk_size=isolated_format.DISK_FILE_CHUNK):
150 """Copy data from srcfileobj to dstfileobj.
151
152 Providing size means exactly that amount of data will be copied (if there
153 isn't enough data, an IOError exception is thrown). Otherwise all data until
154 the EOF marker will be copied.
155 """
156 if size == -1 and hasattr(srcfileobj, 'tell'):
157 if srcfileobj.tell() != 0:
158 raise IOError('partial file but not using size')
159
160 written = 0
161 while written != size:
162 readsize = chunk_size
163 if size > 0:
164 readsize = min(readsize, size-written)
165 data = srcfileobj.read(readsize)
166 if not data:
167 if size == -1:
168 break
169 raise IOError('partial file, got %s, wanted %s' % (written, size))
170 dstfileobj.write(data)
171 written += len(data)
172
173
174def putfile(srcfileobj, dstpath, file_mode=None, size=-1, use_symlink=False):
175 """Put srcfileobj at the given dstpath with given mode.
176
177 The function aims to do this as efficiently as possible while still allowing
178 any possible file like object be given.
179
180 Creating a tree of hardlinks has a few drawbacks:
181 - tmpfs cannot be used for the scratch space. The tree has to be on the same
182 partition as the cache.
183 - involves a write to the inode, which advances ctime, cause a metadata
184 writeback (causing disk seeking).
185 - cache ctime cannot be used to detect modifications / corruption.
186 - Some file systems (NTFS) have a 64k limit on the number of hardlink per
187 partition. This is why the function automatically fallbacks to copying the
188 file content.
189 - /proc/sys/fs/protected_hardlinks causes an additional check to ensure the
190 same owner is for all hardlinks.
191 - Anecdotal report that ext2 is known to be potentially faulty on high rate
192 of hardlink creation.
193
194 Creating a tree of symlinks has a few drawbacks:
195 - Tasks running the equivalent of os.path.realpath() will get the naked path
196 and may fail.
197 - Windows:
198 - Symlinks are reparse points:
199 https://msdn.microsoft.com/library/windows/desktop/aa365460.aspx
200 https://msdn.microsoft.com/library/windows/desktop/aa363940.aspx
201 - Symbolic links are Win32 paths, not NT paths.
202 https://googleprojectzero.blogspot.com/2016/02/the-definitive-guide-on-win32-to-nt.html
203 - Symbolic links are supported on Windows 7 and later only.
204 - SeCreateSymbolicLinkPrivilege is needed, which is not present by
205 default.
206 - SeCreateSymbolicLinkPrivilege is *stripped off* by UAC when a restricted
207 RID is present in the token;
208 https://msdn.microsoft.com/en-us/library/bb530410.aspx
209 """
210 srcpath = fileobj_path(srcfileobj)
211 if srcpath and size == -1:
212 readonly = file_mode is None or (
213 file_mode & (stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH))
214
215 if readonly:
216 # If the file is read only we can link the file
217 if use_symlink:
218 link_mode = file_path.SYMLINK_WITH_FALLBACK
219 else:
220 link_mode = file_path.HARDLINK_WITH_FALLBACK
221 else:
222 # If not read only, we must copy the file
223 link_mode = file_path.COPY
224
225 file_path.link_file(dstpath, srcpath, link_mode)
226 else:
227 # Need to write out the file
228 with fs.open(dstpath, 'wb') as dstfileobj:
229 fileobj_copy(dstfileobj, srcfileobj, size)
230
231 assert fs.exists(dstpath)
232
233 # file_mode of 0 is actually valid, so need explicit check.
234 if file_mode is not None:
235 fs.chmod(dstpath, file_mode)
236
237
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000238def zip_compress(content_generator, level=7):
239 """Reads chunks from |content_generator| and yields zip compressed chunks."""
240 compressor = zlib.compressobj(level)
241 for chunk in content_generator:
242 compressed = compressor.compress(chunk)
243 if compressed:
244 yield compressed
245 tail = compressor.flush(zlib.Z_FINISH)
246 if tail:
247 yield tail
248
249
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400250def zip_decompress(
251 content_generator, chunk_size=isolated_format.DISK_FILE_CHUNK):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000252 """Reads zipped data from |content_generator| and yields decompressed data.
253
254 Decompresses data in small chunks (no larger than |chunk_size|) so that
255 zip bomb file doesn't cause zlib to preallocate huge amount of memory.
256
257 Raises IOError if data is corrupted or incomplete.
258 """
259 decompressor = zlib.decompressobj()
260 compressed_size = 0
261 try:
262 for chunk in content_generator:
263 compressed_size += len(chunk)
264 data = decompressor.decompress(chunk, chunk_size)
265 if data:
266 yield data
267 while decompressor.unconsumed_tail:
268 data = decompressor.decompress(decompressor.unconsumed_tail, chunk_size)
269 if data:
270 yield data
271 tail = decompressor.flush()
272 if tail:
273 yield tail
274 except zlib.error as e:
275 raise IOError(
276 'Corrupted zip stream (read %d bytes) - %s' % (compressed_size, e))
277 # Ensure all data was read and decompressed.
278 if decompressor.unused_data or decompressor.unconsumed_tail:
279 raise IOError('Not all data was decompressed')
280
281
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000282def _get_zip_compression_level(filename):
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000283 """Given a filename calculates the ideal zip compression level to use."""
284 file_ext = os.path.splitext(filename)[1].lower()
285 # TODO(csharp): Profile to find what compression level works best.
286 return 0 if file_ext in ALREADY_COMPRESSED_TYPES else 7
287
288
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000289def create_directories(base_directory, files):
290 """Creates the directory structure needed by the given list of files."""
291 logging.debug('create_directories(%s, %d)', base_directory, len(files))
292 # Creates the tree of directories to create.
293 directories = set(os.path.dirname(f) for f in files)
294 for item in list(directories):
295 while item:
296 directories.add(item)
297 item = os.path.dirname(item)
298 for d in sorted(directories):
299 if d:
aludwin606aa1f2016-10-31 18:41:30 -0700300 abs_d = os.path.join(base_directory, d)
301 if not fs.isdir(abs_d):
302 fs.mkdir(abs_d)
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000303
304
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000305def _create_symlinks(base_directory, files):
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500306 """Creates any symlinks needed by the given set of files."""
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000307 for filepath, properties in files:
308 if 'l' not in properties:
309 continue
310 if sys.platform == 'win32':
Marc-Antoine Ruelccafe0e2013-11-08 16:15:36 -0500311 # TODO(maruel): Create symlink via the win32 api.
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000312 logging.warning('Ignoring symlink %s', filepath)
313 continue
314 outfile = os.path.join(base_directory, filepath)
nodir90bc8dc2016-06-15 13:35:21 -0700315 try:
316 os.symlink(properties['l'], outfile) # pylint: disable=E1101
317 except OSError as e:
318 if e.errno == errno.EEXIST:
319 raise AlreadyExists('File %s already exists.' % outfile)
320 raise
maruel@chromium.orgaf254852013-09-17 17:48:14 +0000321
322
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400323class FileItem(isolate_storage.Item):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800324 """A file to push to Storage.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000325
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800326 Its digest and size may be provided in advance, if known. Otherwise they will
327 be derived from the file content.
328 """
329
330 def __init__(self, path, digest=None, size=None, high_priority=False):
331 super(FileItem, self).__init__(
332 digest,
maruel12e30012015-10-09 11:55:35 -0700333 size if size is not None else fs.stat(path).st_size,
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800334 high_priority)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000335 self.path = path
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +0000336 self.compression_level = _get_zip_compression_level(path)
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000337
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800338 def content(self):
339 return file_read(self.path)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +0000340
341
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400342class BufferItem(isolate_storage.Item):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000343 """A byte buffer to push to Storage."""
344
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800345 def __init__(self, buf, high_priority=False):
346 super(BufferItem, self).__init__(None, len(buf), high_priority)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000347 self.buffer = buf
348
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800349 def content(self):
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000350 return [self.buffer]
351
352
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000353class Storage(object):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800354 """Efficiently downloads or uploads large set of files via StorageApi.
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000355
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800356 Implements compression support, parallel 'contains' checks, parallel uploads
357 and more.
358
359 Works only within single namespace (and thus hashing algorithm and compression
360 scheme are fixed).
361
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400362 Spawns multiple internal threads. Thread safe, but not fork safe. Modifies
363 signal handlers table to handle Ctrl+C.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800364 """
365
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700366 def __init__(self, storage_api):
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000367 self._storage_api = storage_api
368 self._cpu_thread_pool = None
369 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400370 self._aborted = False
371 self._prev_sig_handlers = {}
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000372
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000373 @property
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000374 def server_ref(self):
375 """Shortcut to get the server_ref from storage_api.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700376
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000377 This can be used to get the underlying hash_algo.
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700378 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000379 return self._storage_api.server_ref
Vadim Shtayurae0ab1902014-04-29 10:55:27 -0700380
381 @property
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000382 def cpu_thread_pool(self):
383 """ThreadPool for CPU-bound tasks like zipping."""
384 if self._cpu_thread_pool is None:
Marc-Antoine Ruelbdad1182015-02-06 16:04:35 -0500385 threads = max(threading_utils.num_processors(), 2)
386 if sys.maxsize <= 2L**32:
387 # On 32 bits userland, do not try to use more than 16 threads.
388 threads = min(threads, 16)
389 self._cpu_thread_pool = threading_utils.ThreadPool(2, threads, 0, 'zip')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000390 return self._cpu_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000391
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000392 @property
393 def net_thread_pool(self):
394 """AutoRetryThreadPool for IO-bound tasks, retries IOError."""
395 if self._net_thread_pool is None:
Vadim Shtayura3148e072014-09-02 18:51:52 -0700396 self._net_thread_pool = threading_utils.IOAutoRetryThreadPool()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000397 return self._net_thread_pool
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000398
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000399 def close(self):
400 """Waits for all pending tasks to finish."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400401 logging.info('Waiting for all threads to die...')
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000402 if self._cpu_thread_pool:
403 self._cpu_thread_pool.join()
404 self._cpu_thread_pool.close()
405 self._cpu_thread_pool = None
406 if self._net_thread_pool:
407 self._net_thread_pool.join()
408 self._net_thread_pool.close()
409 self._net_thread_pool = None
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400410 logging.info('Done.')
411
412 def abort(self):
413 """Cancels any pending or future operations."""
414 # This is not strictly theadsafe, but in the worst case the logging message
415 # will be printed twice. Not a big deal. In other places it is assumed that
416 # unprotected reads and writes to _aborted are serializable (it is true
417 # for python) and thus no locking is used.
418 if not self._aborted:
419 logging.warning('Aborting... It can take a while.')
420 self._aborted = True
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000421
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000422 def __enter__(self):
423 """Context manager interface."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400424 assert not self._prev_sig_handlers, self._prev_sig_handlers
425 for s in (signal.SIGINT, signal.SIGTERM):
426 self._prev_sig_handlers[s] = signal.signal(s, lambda *_args: self.abort())
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000427 return self
428
429 def __exit__(self, _exc_type, _exc_value, _traceback):
430 """Context manager interface."""
431 self.close()
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400432 while self._prev_sig_handlers:
433 s, h = self._prev_sig_handlers.popitem()
434 signal.signal(s, h)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000435 return False
436
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000437 def upload_items(self, items):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000438 """Uploads a generator of Item to the isolate server.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000439
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800440 It figures out what items are missing from the server and uploads only them.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000441
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000442 It uses 3 threads internally:
443 - One to create batches based on a timeout
444 - One to dispatch the /contains RPC and field the missing entries
445 - One to field the /push RPC
446
447 The main threads enumerates 'items' and pushes to the first thread. Then it
448 join() all the threads, waiting for them to complete.
449
450 (enumerate items of Item, this can be slow as disk is traversed)
451 |
452 v
453 _create_items_batches_thread Thread #1
454 (generates list(Item), every 3s or 20~100 items)
455 |
456 v
457 _do_lookups_thread Thread #2
458 | |
459 v v
460 (missing) (was on server)
461 |
462 v
463 _handle_missing_thread Thread #3
464 |
465 v
466 (upload Item, append to uploaded)
467
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000468 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400469 items: list of isolate_storage.Item instances that represents data to
470 upload.
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000471
472 Returns:
473 List of items that were uploaded. All other items are already there.
474 """
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000475 incoming = Queue.Queue()
476 batches_to_lookup = Queue.Queue()
477 missing = Queue.Queue()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000478 uploaded = []
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800479
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000480 def _create_items_batches_thread():
481 """Creates batches for /contains RPC lookup from individual items.
482
483 Input: incoming
484 Output: batches_to_lookup
485 """
486 try:
487 batch_size_index = 0
488 batch_size = ITEMS_PER_CONTAINS_QUERIES[batch_size_index]
489 batch = []
490 while not self._aborted:
491 try:
492 item = incoming.get(True, timeout=3)
493 if item:
494 batch.append(item)
495 except Queue.Empty:
496 item = False
497 if len(batch) == batch_size or (not item and batch):
498 if len(batch) == batch_size:
499 batch_size_index += 1
500 batch_size = ITEMS_PER_CONTAINS_QUERIES[
501 min(batch_size_index, len(ITEMS_PER_CONTAINS_QUERIES)-1)]
502 batches_to_lookup.put(batch)
503 batch = []
504 if item is None:
505 break
506 finally:
507 # Unblock the next pipeline.
508 batches_to_lookup.put(None)
509
510 def _do_lookups_thread():
511 """Enqueues all the /contains RPCs and emits the missing items.
512
513 Input: batches_to_lookup
514 Output: missing, to_upload
515 """
516 try:
517 channel = threading_utils.TaskChannel()
518 def _contains(b):
519 if self._aborted:
520 raise Aborted()
521 return self._storage_api.contains(b)
522
523 pending_contains = 0
524 while not self._aborted:
525 batch = batches_to_lookup.get()
526 if batch is None:
527 break
528 self.net_thread_pool.add_task_with_channel(
529 channel, threading_utils.PRIORITY_HIGH, _contains, batch)
530 pending_contains += 1
531 while pending_contains and not self._aborted:
532 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000533 v = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000534 except threading_utils.TaskChannel.Timeout:
535 break
536 pending_contains -= 1
537 for missing_item, push_state in v.iteritems():
538 missing.put((missing_item, push_state))
539 while pending_contains and not self._aborted:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000540 for missing_item, push_state in channel.next().iteritems():
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000541 missing.put((missing_item, push_state))
542 pending_contains -= 1
543 finally:
544 # Unblock the next pipeline.
545 missing.put((None, None))
546
547 def _handle_missing_thread():
548 """Sends the missing items to the uploader.
549
550 Input: missing
551 Output: uploaded
552 """
Vadim Shtayura3148e072014-09-02 18:51:52 -0700553 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000554 channel = threading_utils.TaskChannel()
555 pending_upload = 0
556 while not self._aborted:
557 try:
558 missing_item, push_state = missing.get(True, timeout=5)
559 if missing_item is None:
560 break
561 self._async_push(channel, missing_item, push_state)
562 pending_upload += 1
563 except Queue.Empty:
564 pass
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000565 detector.ping()
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000566 while not self._aborted and pending_upload:
567 try:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000568 item = channel.next(timeout=0)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000569 except threading_utils.TaskChannel.Timeout:
570 break
571 uploaded.append(item)
572 pending_upload -= 1
573 logging.debug(
574 'Uploaded %d; %d pending: %s (%d)',
575 len(uploaded), pending_upload, item.digest, item.size)
576 while not self._aborted and pending_upload:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000577 item = channel.next()
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000578 uploaded.append(item)
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000579 pending_upload -= 1
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000580 logging.debug(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000581 'Uploaded %d; %d pending: %s (%d)',
582 len(uploaded), pending_upload, item.digest, item.size)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000583
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000584 threads = [
585 threading.Thread(target=_create_items_batches_thread),
586 threading.Thread(target=_do_lookups_thread),
587 threading.Thread(target=_handle_missing_thread),
588 ]
589 for t in threads:
590 t.start()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000591
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000592 try:
593 # For each digest keep only first isolate_storage.Item that matches it.
594 # All other items are just indistinguishable copies from the point of view
595 # of isolate server (it doesn't care about paths at all, only content and
596 # digests).
597 seen = {}
598 try:
599 # TODO(maruel): Reorder the items as a priority queue, with larger items
600 # being processed first. This is, before hashing the data.
601 # This must be done in the primary thread since items can be a
602 # generator.
603 for item in items:
604 # This is I/O heavy.
605 item.prepare(self.server_ref.hash_algo)
606 if seen.setdefault(item.digest, item) is item:
607 incoming.put(item)
608 finally:
609 incoming.put(None)
610 finally:
611 for t in threads:
612 t.join()
613
614 logging.info('All %s files are uploaded', len(uploaded))
Marc-Antoine Ruel73c0ae72018-11-30 14:05:45 +0000615 if seen:
616 _print_upload_stats(seen.values(), uploaded)
vadimsh@chromium.orgf24e5c32013-10-11 21:16:21 +0000617 return uploaded
618
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000619 def _async_push(self, channel, item, push_state):
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000620 """Starts asynchronous push to the server in a parallel thread.
621
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000622 Can be used only after |item| was checked for presence on a server with a
623 /contains RPC.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800624
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000625 Arguments:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000626 channel: TaskChannel that receives back |item| when upload ends.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400627 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000628 push_state: push state returned by storage_api.contains(). It contains
629 storage specific information describing how to upload the item (for
630 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800631
632 Returns:
633 None, but |channel| later receives back |item| when upload ends.
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000634 """
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800635 # Thread pool task priority.
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400636 priority = (
Vadim Shtayura3148e072014-09-02 18:51:52 -0700637 threading_utils.PRIORITY_HIGH if item.high_priority
638 else threading_utils.PRIORITY_MED)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800639
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000640 def _push(content):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400641 """Pushes an isolate_storage.Item and returns it to |channel|."""
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400642 if self._aborted:
643 raise Aborted()
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000644 item.prepare(self.server_ref.hash_algo)
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800645 self._storage_api.push(item, push_state, content)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000646 return item
647
Wei Huang1a38fbe2017-11-28 22:55:22 -0500648 # If zipping is not required, just start a push task. Don't pass 'content'
649 # so that it can create a new generator when it retries on failures.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000650 if not self.server_ref.is_with_compression:
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000651 self.net_thread_pool.add_task_with_channel(channel, priority, _push, None)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000652 return
653
654 # If zipping is enabled, zip in a separate thread.
655 def zip_and_push():
656 # TODO(vadimsh): Implement streaming uploads. Before it's done, assemble
657 # content right here. It will block until all file is zipped.
658 try:
Vadim Shtayuraf9e401b2014-10-15 18:19:37 +0400659 if self._aborted:
660 raise Aborted()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800661 stream = zip_compress(item.content(), item.compression_level)
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000662 data = ''.join(stream)
663 except Exception as exc:
664 logging.error('Failed to zip \'%s\': %s', item, exc)
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800665 channel.send_exception()
vadimsh@chromium.org7cdf1c02013-09-25 00:24:16 +0000666 return
Wei Huang1a38fbe2017-11-28 22:55:22 -0500667 # Pass '[data]' explicitly because the compressed data is not same as the
668 # one provided by 'item'. Since '[data]' is a list, it can safely be
669 # reused during retries.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000670 self.net_thread_pool.add_task_with_channel(
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000671 channel, priority, _push, [data])
vadimsh@chromium.orgbcb966b2013-10-01 18:14:18 +0000672 self.cpu_thread_pool.add_task(priority, zip_and_push)
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000673
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800674 def push(self, item, push_state):
675 """Synchronously pushes a single item to the server.
676
677 If you need to push many items at once, consider using 'upload_items' or
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000678 '_async_push' with instance of TaskChannel.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800679
680 Arguments:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400681 item: item to upload as instance of isolate_storage.Item class.
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000682 push_state: push state returned by storage_api.contains(). It contains
683 storage specific information describing how to upload the item (for
684 example in case of cloud storage, it is signed upload URLs).
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800685
686 Returns:
687 Pushed item (same object as |item|).
688 """
689 channel = threading_utils.TaskChannel()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700690 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT):
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +0000691 self._async_push(channel, item, push_state)
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000692 pushed = channel.next()
Vadim Shtayurabcff74f2014-02-27 16:19:34 -0800693 assert pushed is item
694 return item
695
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000696 def async_fetch(self, channel, priority, digest, size, sink):
697 """Starts asynchronous fetch from the server in a parallel thread.
698
699 Arguments:
700 channel: TaskChannel that receives back |digest| when download ends.
701 priority: thread pool task priority for the fetch.
702 digest: hex digest of an item to download.
703 size: expected size of the item (after decompression).
704 sink: function that will be called as sink(generator).
705 """
706 def fetch():
707 try:
708 # Prepare reading pipeline.
Adrian Ludwinb4ebc092017-09-13 07:46:24 -0400709 stream = self._storage_api.fetch(digest, size, 0)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000710 if self.server_ref.is_with_compression:
Marc-Antoine Ruel8bee66d2014-08-28 19:02:07 -0400711 stream = zip_decompress(stream, isolated_format.DISK_FILE_CHUNK)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000712 # Run |stream| through verifier that will assert its size.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +0000713 verifier = FetchStreamVerifier(
714 stream, self.server_ref.hash_algo, digest, size)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000715 # Verified stream goes to |sink|.
716 sink(verifier.run())
717 except Exception as err:
Vadim Shtayura0ffc4092013-11-20 17:49:52 -0800718 logging.error('Failed to fetch %s: %s', digest, err)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000719 raise
720 return digest
721
722 # Don't bother with zip_thread_pool for decompression. Decompression is
723 # really fast and most probably IO bound anyway.
724 self.net_thread_pool.add_task_with_channel(channel, priority, fetch)
725
vadimsh@chromium.org35122be2013-09-19 02:48:00 +0000726
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000727class FetchQueue(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400728 """Fetches items from Storage and places them into ContentAddressedCache.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000729
730 It manages multiple concurrent fetch operations. Acts as a bridge between
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400731 Storage and ContentAddressedCache so that Storage and ContentAddressedCache
732 don't depend on each other at all.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000733 """
734
735 def __init__(self, storage, cache):
736 self.storage = storage
737 self.cache = cache
738 self._channel = threading_utils.TaskChannel()
739 self._pending = set()
740 self._accessed = set()
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000741 self._fetched = set(cache)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400742 # Pending digests that the caller waits for, see wait_on()/wait().
743 self._waiting_on = set()
744 # Already fetched digests the caller waits for which are not yet returned by
745 # wait().
746 self._waiting_on_ready = set()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000747
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400748 def add(
Vadim Shtayura3148e072014-09-02 18:51:52 -0700749 self,
750 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400751 size=local_caching.UNKNOWN_FILE_SIZE,
Vadim Shtayura3148e072014-09-02 18:51:52 -0700752 priority=threading_utils.PRIORITY_MED):
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000753 """Starts asynchronous fetch of item |digest|."""
754 # Fetching it now?
755 if digest in self._pending:
756 return
757
758 # Mark this file as in use, verify_all_cached will later ensure it is still
759 # in cache.
760 self._accessed.add(digest)
761
762 # Already fetched? Notify cache to update item's LRU position.
763 if digest in self._fetched:
764 # 'touch' returns True if item is in cache and not corrupted.
765 if self.cache.touch(digest, size):
766 return
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000767 logging.error('%s is corrupted', digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000768 self._fetched.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000769
770 # TODO(maruel): It should look at the free disk space, the current cache
771 # size and the size of the new item on every new item:
772 # - Trim the cache as more entries are listed when free disk space is low,
773 # otherwise if the amount of data downloaded during the run > free disk
774 # space, it'll crash.
775 # - Make sure there's enough free disk space to fit all dependencies of
776 # this run! If not, abort early.
777
778 # Start fetching.
779 self._pending.add(digest)
780 self.storage.async_fetch(
781 self._channel, priority, digest, size,
782 functools.partial(self.cache.write, digest))
783
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400784 def wait_on(self, digest):
785 """Updates digests to be waited on by 'wait'."""
786 # Calculate once the already fetched items. These will be retrieved first.
787 if digest in self._fetched:
788 self._waiting_on_ready.add(digest)
789 else:
790 self._waiting_on.add(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000791
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400792 def wait(self):
793 """Waits until any of waited-on items is retrieved.
794
795 Once this happens, it is remove from the waited-on set and returned.
796
797 This function is called in two waves. The first wave it is done for HIGH
798 priority items, the isolated files themselves. The second wave it is called
799 for all the files.
800
801 If the waited-on set is empty, raises RuntimeError.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000802 """
803 # Flush any already fetched items.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400804 if self._waiting_on_ready:
805 return self._waiting_on_ready.pop()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000806
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400807 assert self._waiting_on, 'Needs items to wait on'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000808
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400809 # Wait for one waited-on item to be fetched.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000810 while self._pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +0000811 digest = self._channel.next()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000812 self._pending.remove(digest)
813 self._fetched.add(digest)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400814 if digest in self._waiting_on:
815 self._waiting_on.remove(digest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000816 return digest
817
818 # Should never reach this point due to assert above.
819 raise RuntimeError('Impossible state')
820
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400821 @property
822 def wait_queue_empty(self):
823 """Returns True if there is no digest left for wait() to return."""
824 return not self._waiting_on and not self._waiting_on_ready
825
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000826 def inject_local_file(self, path, algo):
827 """Adds local file to the cache as if it was fetched from storage."""
maruel12e30012015-10-09 11:55:35 -0700828 with fs.open(path, 'rb') as f:
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000829 data = f.read()
830 digest = algo(data).hexdigest()
831 self.cache.write(digest, [data])
832 self._fetched.add(digest)
833 return digest
834
835 @property
836 def pending_count(self):
837 """Returns number of items to be fetched."""
838 return len(self._pending)
839
840 def verify_all_cached(self):
841 """True if all accessed items are in cache."""
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +0000842 # Not thread safe, but called after all work is done.
843 return self._accessed.issubset(self.cache)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000844
845
846class FetchStreamVerifier(object):
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400847 """Verifies that fetched file is valid before passing it to the
848 ContentAddressedCache.
849 """
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000850
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400851 def __init__(self, stream, hasher, expected_digest, expected_size):
852 """Initializes the verifier.
853
854 Arguments:
855 * stream: an iterable yielding chunks of content
856 * hasher: an object from hashlib that supports update() and hexdigest()
857 (eg, hashlib.sha1).
858 * expected_digest: if the entire stream is piped through hasher and then
859 summarized via hexdigest(), this should be the result. That is, it
860 should be a hex string like 'abc123'.
861 * expected_size: either the expected size of the stream, or
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400862 local_caching.UNKNOWN_FILE_SIZE.
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400863 """
Marc-Antoine Rueldf4976d2015-04-15 19:56:21 -0400864 assert stream is not None
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000865 self.stream = stream
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400866 self.expected_digest = expected_digest
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000867 self.expected_size = expected_size
868 self.current_size = 0
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400869 self.rolling_hash = hasher()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000870
871 def run(self):
872 """Generator that yields same items as |stream|.
873
874 Verifies |stream| is complete before yielding a last chunk to consumer.
875
876 Also wraps IOError produced by consumer into MappingError exceptions since
877 otherwise Storage will retry fetch on unrelated local cache errors.
878 """
879 # Read one chunk ahead, keep it in |stored|.
880 # That way a complete stream can be verified before pushing last chunk
881 # to consumer.
882 stored = None
883 for chunk in self.stream:
884 assert chunk is not None
885 if stored is not None:
886 self._inspect_chunk(stored, is_last=False)
887 try:
888 yield stored
889 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400890 raise isolated_format.MappingError(
891 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000892 stored = chunk
893 if stored is not None:
894 self._inspect_chunk(stored, is_last=True)
895 try:
896 yield stored
897 except IOError as exc:
Marc-Antoine Ruel52436aa2014-08-28 21:57:57 -0400898 raise isolated_format.MappingError(
899 'Failed to store an item in cache: %s' % exc)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000900
901 def _inspect_chunk(self, chunk, is_last):
902 """Called for each fetched chunk before passing it to consumer."""
903 self.current_size += len(chunk)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400904 self.rolling_hash.update(chunk)
905 if not is_last:
906 return
907
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -0400908 if ((self.expected_size != local_caching.UNKNOWN_FILE_SIZE) and
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000909 (self.expected_size != self.current_size)):
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400910 msg = 'Incorrect file size: want %d, got %d' % (
911 self.expected_size, self.current_size)
Adrian Ludwin6d2a8342017-08-15 19:56:54 -0400912 raise IOError(msg)
913
914 actual_digest = self.rolling_hash.hexdigest()
915 if self.expected_digest != actual_digest:
916 msg = 'Incorrect digest: want %s, got %s' % (
917 self.expected_digest, actual_digest)
Adrian Ludwin21920d52017-08-22 09:34:19 -0400918 raise IOError(msg)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +0000919
920
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700921class IsolatedBundle(object):
922 """Fetched and parsed .isolated file with all dependencies."""
923
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000924 def __init__(self, filter_cb):
925 """
926 filter_cb: callback function to filter downloaded content.
927 When filter_cb is not None, Isolated file is downloaded iff
928 filter_cb(filepath) returns True.
929 """
930
Vadim Shtayura3148e072014-09-02 18:51:52 -0700931 self.command = []
932 self.files = {}
933 self.read_only = None
934 self.relative_cwd = None
935 # The main .isolated file, a IsolatedFile instance.
936 self.root = None
937
Takuto Ikuta1e6072c2018-11-06 20:42:43 +0000938 self._filter_cb = filter_cb
939
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700940 def fetch(self, fetch_queue, root_isolated_hash, algo):
941 """Fetches the .isolated and all the included .isolated.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700942
943 It enables support for "included" .isolated files. They are processed in
944 strict order but fetched asynchronously from the cache. This is important so
945 that a file in an included .isolated file that is overridden by an embedding
946 .isolated file is not fetched needlessly. The includes are fetched in one
947 pass and the files are fetched as soon as all the ones on the left-side
948 of the tree were fetched.
949
950 The prioritization is very important here for nested .isolated files.
951 'includes' have the highest priority and the algorithm is optimized for both
952 deep and wide trees. A deep one is a long link of .isolated files referenced
953 one at a time by one item in 'includes'. A wide one has a large number of
954 'includes' in a single .isolated file. 'left' is defined as an included
955 .isolated file earlier in the 'includes' list. So the order of the elements
956 in 'includes' is important.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700957
958 As a side effect this method starts asynchronous fetch of all data files
959 by adding them to |fetch_queue|. It doesn't wait for data files to finish
960 fetching though.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700961 """
962 self.root = isolated_format.IsolatedFile(root_isolated_hash, algo)
963
964 # Isolated files being retrieved now: hash -> IsolatedFile instance.
965 pending = {}
966 # Set of hashes of already retrieved items to refuse recursive includes.
967 seen = set()
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700968 # Set of IsolatedFile's whose data files have already being fetched.
969 processed = set()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700970
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700971 def retrieve_async(isolated_file):
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400972 """Retrieves an isolated file included by the root bundle."""
Vadim Shtayura3148e072014-09-02 18:51:52 -0700973 h = isolated_file.obj_hash
974 if h in seen:
975 raise isolated_format.IsolatedError(
976 'IsolatedFile %s is retrieved recursively' % h)
977 assert h not in pending
978 seen.add(h)
979 pending[h] = isolated_file
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400980 # This isolated item is being added dynamically, notify FetchQueue.
981 fetch_queue.wait_on(h)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700982 fetch_queue.add(h, priority=threading_utils.PRIORITY_HIGH)
983
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700984 # Start fetching root *.isolated file (single file, not the whole bundle).
985 retrieve_async(self.root)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700986
987 while pending:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700988 # Wait until some *.isolated file is fetched, parse it.
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -0400989 item_hash = fetch_queue.wait()
Vadim Shtayura3148e072014-09-02 18:51:52 -0700990 item = pending.pop(item_hash)
tansell9e04a8d2016-07-28 09:31:59 -0700991 with fetch_queue.cache.getfileobj(item_hash) as f:
992 item.load(f.read())
Vadim Shtayura3148e072014-09-02 18:51:52 -0700993
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700994 # Start fetching included *.isolated files.
Vadim Shtayura3148e072014-09-02 18:51:52 -0700995 for new_child in item.children:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700996 retrieve_async(new_child)
Vadim Shtayura3148e072014-09-02 18:51:52 -0700997
Vadim Shtayura7f7459c2014-09-04 13:25:10 -0700998 # Always fetch *.isolated files in traversal order, waiting if necessary
999 # until next to-be-processed node loads. "Waiting" is done by yielding
1000 # back to the outer loop, that waits until some *.isolated is loaded.
1001 for node in isolated_format.walk_includes(self.root):
1002 if node not in processed:
1003 # Not visited, and not yet loaded -> wait for it to load.
1004 if not node.is_loaded:
1005 break
1006 # Not visited and loaded -> process it and continue the traversal.
1007 self._start_fetching_files(node, fetch_queue)
1008 processed.add(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001009
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001010 # All *.isolated files should be processed by now and only them.
1011 all_isolateds = set(isolated_format.walk_includes(self.root))
1012 assert all_isolateds == processed, (all_isolateds, processed)
Marc-Antoine Ruel2d631542018-04-19 20:28:09 -04001013 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
Vadim Shtayura3148e072014-09-02 18:51:52 -07001014
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001015 # Extract 'command' and other bundle properties.
1016 for node in isolated_format.walk_includes(self.root):
1017 self._update_self(node)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001018 self.relative_cwd = self.relative_cwd or ''
1019
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001020 def _start_fetching_files(self, isolated, fetch_queue):
1021 """Starts fetching files from |isolated| that are not yet being fetched.
Vadim Shtayura3148e072014-09-02 18:51:52 -07001022
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001023 Modifies self.files.
1024 """
maruel10bea7b2016-12-07 05:03:49 -08001025 files = isolated.data.get('files', {})
1026 logging.debug('fetch_files(%s, %d)', isolated.obj_hash, len(files))
1027 for filepath, properties in files.iteritems():
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001028 if self._filter_cb and not self._filter_cb(filepath):
1029 continue
1030
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001031 # Root isolated has priority on the files being mapped. In particular,
1032 # overridden files must not be fetched.
1033 if filepath not in self.files:
1034 self.files[filepath] = properties
tansell9e04a8d2016-07-28 09:31:59 -07001035
1036 # Make sure if the isolated is read only, the mode doesn't have write
1037 # bits.
1038 if 'm' in properties and self.read_only:
1039 properties['m'] &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
1040
1041 # Preemptively request hashed files.
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001042 if 'h' in properties:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001043 fetch_queue.add(
1044 properties['h'], properties['s'], threading_utils.PRIORITY_MED)
1045
1046 def _update_self(self, node):
1047 """Extracts bundle global parameters from loaded *.isolated file.
1048
1049 Will be called with each loaded *.isolated file in order of traversal of
1050 isolated include graph (see isolated_format.walk_includes).
1051 """
Vadim Shtayura3148e072014-09-02 18:51:52 -07001052 # Grabs properties.
1053 if not self.command and node.data.get('command'):
1054 # Ensure paths are correctly separated on windows.
1055 self.command = node.data['command']
1056 if self.command:
1057 self.command[0] = self.command[0].replace('/', os.path.sep)
Vadim Shtayura3148e072014-09-02 18:51:52 -07001058 if self.read_only is None and node.data.get('read_only') is not None:
1059 self.read_only = node.data['read_only']
1060 if (self.relative_cwd is None and
1061 node.data.get('relative_cwd') is not None):
1062 self.relative_cwd = node.data['relative_cwd']
1063
1064
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001065def get_storage(server_ref):
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001066 """Returns Storage class that can upload and download from |namespace|.
1067
1068 Arguments:
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001069 server_ref: isolate_storage.ServerRef instance.
Vadim Shtayurabcff74f2014-02-27 16:19:34 -08001070
1071 Returns:
1072 Instance of Storage.
1073 """
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001074 assert isinstance(server_ref, isolate_storage.ServerRef), repr(server_ref)
1075 return Storage(isolate_storage.get_storage_api(server_ref))
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001076
maruel@chromium.orgdedbf492013-09-12 20:42:11 +00001077
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001078def fetch_isolated(isolated_hash, storage, cache, outdir, use_symlinks,
1079 filter_cb=None):
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001080 """Aggressively downloads the .isolated file(s), then download all the files.
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001081
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001082 Arguments:
1083 isolated_hash: hash of the root *.isolated file.
1084 storage: Storage class that communicates with isolate storage.
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001085 cache: ContentAddressedCache class that knows how to store and map files
1086 locally.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001087 outdir: Output directory to map file tree to.
maruel4409e302016-07-19 14:25:51 -07001088 use_symlinks: Use symlinks instead of hardlinks when True.
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001089 filter_cb: filter that works as whitelist for downloaded files.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001090
1091 Returns:
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001092 IsolatedBundle object that holds details about loaded *.isolated file.
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001093 """
Marc-Antoine Ruel4e8cd182014-06-18 13:27:17 -04001094 logging.debug(
maruel4409e302016-07-19 14:25:51 -07001095 'fetch_isolated(%s, %s, %s, %s, %s)',
1096 isolated_hash, storage, cache, outdir, use_symlinks)
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001097 # Hash algorithm to use, defined by namespace |storage| is using.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001098 algo = storage.server_ref.hash_algo
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001099 fetch_queue = FetchQueue(storage, cache)
Takuto Ikuta1e6072c2018-11-06 20:42:43 +00001100 bundle = IsolatedBundle(filter_cb)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001101
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001102 with tools.Profiler('GetIsolateds'):
1103 # Optionally support local files by manually adding them to cache.
1104 if not isolated_format.is_valid_hash(isolated_hash, algo):
1105 logging.debug('%s is not a valid hash, assuming a file '
1106 '(algo was %s, hash size was %d)',
1107 isolated_hash, algo(), algo().digest_size)
1108 path = unicode(os.path.abspath(isolated_hash))
1109 try:
1110 isolated_hash = fetch_queue.inject_local_file(path, algo)
1111 except IOError as e:
1112 raise isolated_format.MappingError(
1113 '%s doesn\'t seem to be a valid file. Did you intent to pass a '
1114 'valid hash (error: %s)?' % (isolated_hash, e))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001115
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001116 # Load all *.isolated and start loading rest of the files.
1117 bundle.fetch(fetch_queue, isolated_hash, algo)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001118
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001119 with tools.Profiler('GetRest'):
1120 # Create file system hierarchy.
1121 file_path.ensure_tree(outdir)
1122 create_directories(outdir, bundle.files)
Marc-Antoine Rueldcff6462018-12-04 16:35:18 +00001123 _create_symlinks(outdir, bundle.files.iteritems())
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001124
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001125 # Ensure working directory exists.
1126 cwd = os.path.normpath(os.path.join(outdir, bundle.relative_cwd))
1127 file_path.ensure_tree(cwd)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001128
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001129 # Multimap: digest -> list of pairs (path, props).
1130 remaining = {}
1131 for filepath, props in bundle.files.iteritems():
1132 if 'h' in props:
1133 remaining.setdefault(props['h'], []).append((filepath, props))
1134 fetch_queue.wait_on(props['h'])
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001135
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001136 # Now block on the remaining files to be downloaded and mapped.
1137 logging.info('Retrieving remaining files (%d of them)...',
1138 fetch_queue.pending_count)
1139 last_update = time.time()
1140 with threading_utils.DeadlockDetector(DEADLOCK_TIMEOUT) as detector:
1141 while remaining:
1142 detector.ping()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001143
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001144 # Wait for any item to finish fetching to cache.
1145 digest = fetch_queue.wait()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001146
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001147 # Create the files in the destination using item in cache as the
1148 # source.
1149 for filepath, props in remaining.pop(digest):
1150 fullpath = os.path.join(outdir, filepath)
tansell9e04a8d2016-07-28 09:31:59 -07001151
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001152 with cache.getfileobj(digest) as srcfileobj:
1153 filetype = props.get('t', 'basic')
tanselle4288c32016-07-28 09:45:40 -07001154
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001155 if filetype == 'basic':
1156 # Ignore all bits apart from the user.
1157 file_mode = (props.get('m') or 0500) & 0700
1158 if bundle.read_only:
1159 # Enforce read-only if the root bundle does.
1160 file_mode &= 0500
1161 putfile(
1162 srcfileobj, fullpath, file_mode,
1163 use_symlink=use_symlinks)
tanselle4288c32016-07-28 09:45:40 -07001164
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001165 elif filetype == 'tar':
1166 basedir = os.path.dirname(fullpath)
1167 with tarfile.TarFile(fileobj=srcfileobj, encoding='utf-8') as t:
1168 for ti in t:
1169 if not ti.isfile():
1170 logging.warning(
1171 'Path(%r) is nonfile (%s), skipped',
1172 ti.name, ti.type)
1173 continue
1174 # Handle files created on Windows fetched on POSIX and the
1175 # reverse.
1176 other_sep = '/' if os.path.sep == '\\' else '\\'
1177 name = ti.name.replace(other_sep, os.path.sep)
1178 fp = os.path.normpath(os.path.join(basedir, name))
1179 if not fp.startswith(basedir):
1180 logging.error(
1181 'Path(%r) is outside root directory',
1182 fp)
1183 ifd = t.extractfile(ti)
1184 file_path.ensure_tree(os.path.dirname(fp))
1185 file_mode = ti.mode & 0700
1186 if bundle.read_only:
1187 # Enforce read-only if the root bundle does.
1188 file_mode &= 0500
1189 putfile(ifd, fp, file_mode, ti.size)
tansell26de79e2016-11-13 18:41:11 -08001190
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001191 else:
1192 raise isolated_format.IsolatedError(
1193 'Unknown file type %r', filetype)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001194
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001195 # Report progress.
1196 duration = time.time() - last_update
1197 if duration > DELAY_BETWEEN_UPDATES_IN_SECS:
1198 msg = '%d files remaining...' % len(remaining)
1199 sys.stdout.write(msg + '\n')
1200 sys.stdout.flush()
1201 logging.info(msg)
1202 last_update = time.time()
1203 assert fetch_queue.wait_queue_empty, 'FetchQueue should have been emptied'
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001204
Marc-Antoine Ruele9558372018-08-03 03:41:22 +00001205 # Save the cache right away to not loose the state of the new objects.
1206 cache.save()
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001207 # Cache could evict some items we just tried to fetch, it's a fatal error.
1208 if not fetch_queue.verify_all_cached():
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001209 free_disk = file_path.get_free_space(cache.cache_dir)
1210 msg = (
1211 'Cache is too small to hold all requested files.\n'
1212 ' %s\n cache=%dbytes, %d items; %sb free_space') % (
Marc-Antoine Ruel5d7606b2018-06-15 19:06:12 +00001213 cache.policies, cache.total_size, len(cache), free_disk)
Marc-Antoine Rueldddf6172018-01-23 14:25:43 -05001214 raise isolated_format.MappingError(msg)
Vadim Shtayura7f7459c2014-09-04 13:25:10 -07001215 return bundle
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001216
1217
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001218def _directory_to_metadata(root, algo, blacklist):
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001219 """Yields every file and/or symlink found.
1220
1221 Yields:
1222 tuple(FileItem, relpath, metadata)
1223 For a symlink, FileItem is None.
1224 """
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001225 root = file_path.get_native_path_case(root)
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001226 for relpath in isolated_format.expand_directory_and_symlink(
1227 root,
1228 u'.' + os.path.sep,
1229 blacklist,
1230 follow_symlinks=(sys.platform != 'win32')):
1231 # Immediately hash the file. We need the hash to construct the isolated
1232 # file.
1233 # TODO(maruel): This should be done lazily?
1234 meta = isolated_format.file_to_metadata(
kjlubick80596f02017-04-28 08:13:19 -07001235 os.path.join(root, relpath), {}, 0, algo, False)
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001236 meta.pop('t')
1237 item = None
1238 if 'h' in meta:
1239 item = FileItem(
1240 path=os.path.join(root, relpath),
1241 digest=meta['h'],
1242 size=meta['s'],
1243 high_priority=relpath.endswith('.isolated'))
1244 yield item, relpath, meta
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001245
1246
Marc-Antoine Ruelb69069b2018-11-28 20:50:40 +00001247def _print_upload_stats(items, missing):
1248 """Prints upload stats."""
1249 total = len(items)
1250 total_size = sum(f.size for f in items)
1251 logging.info(
1252 'Total: %6d, %9.1fkiB', total, total_size / 1024.)
1253 cache_hit = set(items).difference(missing)
1254 cache_hit_size = sum(f.size for f in cache_hit)
1255 logging.info(
1256 'cache hit: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1257 len(cache_hit),
1258 cache_hit_size / 1024.,
1259 len(cache_hit) * 100. / total,
1260 cache_hit_size * 100. / total_size if total_size else 0)
1261 cache_miss = missing
1262 cache_miss_size = sum(f.size for f in cache_miss)
1263 logging.info(
1264 'cache miss: %6d, %9.1fkiB, %6.2f%% files, %6.2f%% size',
1265 len(cache_miss),
1266 cache_miss_size / 1024.,
1267 len(cache_miss) * 100. / total,
1268 cache_miss_size * 100. / total_size if total_size else 0)
1269
1270
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001271def _enqueue_dir(dirpath, blacklist, tempdir, hash_algo, hash_algo_name):
1272 """Called by archive_files_to_storage for a directory.
1273
1274 Create an .isolated file.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001275
1276 Yields:
1277 FileItem for every file found, plus one for the isolated file itself.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001278 """
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001279 files = {}
1280 for item, relpath, meta in _directory_to_metadata(
1281 dirpath, hash_algo, blacklist):
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001282 # item is None for a symlink.
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001283 files[relpath] = meta
Marc-Antoine Ruel9cd5ef02018-11-29 23:47:34 +00001284 if item:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001285 yield item
1286
1287 data = {
1288 'algo': hash_algo_name,
1289 'files': files,
1290 'version': isolated_format.ISOLATED_FILE_VERSION,
1291 }
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001292 # TODO(maruel): Stop putting to disk.
1293 handle, isolated = tempfile.mkstemp(dir=tempdir, suffix=u'.isolated')
1294 os.close(handle)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001295 isolated_format.save_isolated(isolated, data)
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001296 yield FileItem(
1297 path=isolated,
1298 digest=isolated_format.hash_file(isolated, hash_algo),
1299 size=fs.stat(isolated).st_size,
1300 high_priority=True)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001301
1302
Vadim Shtayurae0ab1902014-04-29 10:55:27 -07001303def archive_files_to_storage(storage, files, blacklist):
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001304 """Stores every entries into remote storage and returns stats.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001305
1306 Arguments:
1307 storage: a Storage object that communicates with the remote object store.
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001308 files: iterable of files to upload. If a directory is specified (with a
1309 trailing slash), a .isolated file is created and its hash is returned.
1310 Duplicates are skipped.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001311 blacklist: function that returns True if a file should be omitted.
maruel064c0a32016-04-05 11:47:15 -07001312
1313 Returns:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001314 tuple(OrderedDict(path: hash), list(FileItem cold), list(FileItem hot)).
maruel064c0a32016-04-05 11:47:15 -07001315 The first file in the first item is always the isolated file.
Marc-Antoine Ruel2283ad12014-02-09 11:14:57 -05001316 """
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001317 # Dict of path to hash.
1318 results = collections.OrderedDict()
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001319 hash_algo = storage.server_ref.hash_algo
1320 hash_algo_name = storage.server_ref.hash_algo_name
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001321 # TODO(maruel): Stop needing a temporary directory.
1322 tempdir = tempfile.mkdtemp(prefix=u'isolateserver')
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001323 try:
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001324 # Generator of FileItem to pass to upload_items() concurrent operation.
1325 channel = threading_utils.TaskChannel()
1326 uploaded_digests = set()
1327 def _upload_items():
1328 results = storage.upload_items(channel)
1329 uploaded_digests.update(f.digest for f in results)
1330 t = threading.Thread(target=_upload_items)
1331 t.start()
1332
1333 # Keep track locally of the items to determine cold and hot items.
1334 items_found = []
1335 try:
1336 for f in files:
1337 assert isinstance(f, unicode), repr(f)
1338 if f in results:
1339 # Duplicate
1340 continue
1341 try:
1342 h = None
1343 filepath = os.path.abspath(f)
1344 if fs.isdir(filepath):
1345 # Uploading a whole directory.
1346 for item in _enqueue_dir(
1347 filepath, blacklist, tempdir, hash_algo, hash_algo_name):
1348 channel.send_result(item)
1349 items_found.append(item)
1350 # The very last item will be the isolated file.
1351 h = item.digest
1352 elif fs.isfile(filepath):
1353 h = isolated_format.hash_file(filepath, hash_algo)
1354 item = FileItem(
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001355 path=filepath,
1356 digest=h,
maruel12e30012015-10-09 11:55:35 -07001357 size=fs.stat(filepath).st_size,
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001358 high_priority=f.endswith('.isolated'))
1359 channel.send_result(item)
1360 items_found.append(item)
1361 else:
1362 raise Error('%s is neither a file or directory.' % f)
1363 results[f] = h
1364 except OSError:
1365 raise Error('Failed to process %s.' % f)
1366 finally:
1367 # Stops the generator, so _upload_items() can exit.
1368 channel.send_done()
1369 t.join()
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001370
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001371 cold = []
1372 hot = []
Marc-Antoine Ruelcc802b02018-11-28 21:05:01 +00001373 for i in items_found:
1374 # Note that multiple FileItem may have the same .digest.
1375 if i.digest in uploaded_digests:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001376 cold.append(i)
1377 else:
1378 hot.append(i)
maruel064c0a32016-04-05 11:47:15 -07001379 return results, cold, hot
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001380 finally:
maruel12e30012015-10-09 11:55:35 -07001381 if tempdir and fs.isdir(tempdir):
Marc-Antoine Ruele4ad07e2014-10-15 20:22:29 -04001382 file_path.rmtree(tempdir)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001383
1384
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001385@subcommand.usage('<file1..fileN> or - to read from stdin')
1386def CMDarchive(parser, args):
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001387 """Archives data to the server.
1388
1389 If a directory is specified, a .isolated file is created the whole directory
1390 is uploaded. Then this .isolated file can be included in another one to run
1391 commands.
1392
1393 The commands output each file that was processed with its content hash. For
1394 directories, the .isolated generated for the directory is listed as the
1395 directory entry itself.
1396 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001397 add_isolate_server_options(parser)
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001398 add_archive_options(parser)
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001399 options, files = parser.parse_args(args)
nodir55be77b2016-05-03 09:39:57 -07001400 process_isolate_server_options(parser, options, True, True)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001401 server_ref = isolate_storage.ServerRef(
1402 options.isolate_server, options.namespace)
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001403 if files == ['-']:
1404 files = (l.rstrip('\n\r') for l in sys.stdin)
1405 if not files:
1406 parser.error('Nothing to upload')
1407 files = (f.decode('utf-8') for f in files)
1408 blacklist = tools.gen_blacklist(options.blacklist)
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001409 try:
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001410 with get_storage(server_ref) as storage:
1411 results, _cold, _hot = archive_files_to_storage(storage, files, blacklist)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001412 except (Error, local_caching.NoMoreSpace) as e:
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001413 parser.error(e.args[0])
Marc-Antoine Rueld0868ec2018-11-28 20:47:29 +00001414 print('\n'.join('%s %s' % (h, f) for f, h in results.iteritems()))
Marc-Antoine Ruelfcc3cd82013-11-19 16:31:38 -05001415 return 0
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001416
1417
1418def CMDdownload(parser, args):
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001419 """Download data from the server.
1420
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001421 It can either download individual files or a complete tree from a .isolated
1422 file.
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001423 """
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001424 add_isolate_server_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001425 parser.add_option(
Marc-Antoine Ruel185ded42015-01-28 20:49:18 -05001426 '-s', '--isolated', metavar='HASH',
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001427 help='hash of an isolated file, .isolated file content is discarded, use '
1428 '--file if you need it')
1429 parser.add_option(
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001430 '-f', '--file', metavar='HASH DEST', default=[], action='append', nargs=2,
1431 help='hash and destination of a file, can be used multiple times')
1432 parser.add_option(
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001433 '-t', '--target', metavar='DIR', default='download',
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001434 help='destination directory')
maruel4409e302016-07-19 14:25:51 -07001435 parser.add_option(
1436 '--use-symlinks', action='store_true',
1437 help='Use symlinks instead of hardlinks')
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001438 add_cache_options(parser)
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001439 options, args = parser.parse_args(args)
1440 if args:
1441 parser.error('Unsupported arguments: %s' % args)
Marc-Antoine Ruel5028ba22017-08-25 17:37:51 -04001442 if not file_path.enable_symlink():
1443 logging.error('Symlink support is not enabled')
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001444
nodir55be77b2016-05-03 09:39:57 -07001445 process_isolate_server_options(parser, options, True, True)
maruel@chromium.org4f2ebe42013-09-19 13:09:08 +00001446 if bool(options.isolated) == bool(options.file):
1447 parser.error('Use one of --isolated or --file, and only one.')
maruel4409e302016-07-19 14:25:51 -07001448 if not options.cache and options.use_symlinks:
1449 parser.error('--use-symlinks require the use of a cache with --cache')
maruel@chromium.orgb7e79a22013-09-13 01:24:56 +00001450
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001451 cache = process_cache_options(options, trim=True)
maruel2e8d0f52016-07-16 07:51:29 -07001452 cache.cleanup()
maruel12e30012015-10-09 11:55:35 -07001453 options.target = unicode(os.path.abspath(options.target))
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001454 if options.isolated:
maruel12e30012015-10-09 11:55:35 -07001455 if (fs.isfile(options.target) or
1456 (fs.isdir(options.target) and fs.listdir(options.target))):
Marc-Antoine Ruelf90861c2015-03-24 20:54:49 -04001457 parser.error(
1458 '--target \'%s\' exists, please use another target' % options.target)
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001459 server_ref = isolate_storage.ServerRef(
1460 options.isolate_server, options.namespace)
1461 with get_storage(server_ref) as storage:
Vadim Shtayura3172be52013-12-03 12:49:05 -08001462 # Fetching individual files.
1463 if options.file:
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001464 # TODO(maruel): Enable cache in this case too.
Vadim Shtayura3172be52013-12-03 12:49:05 -08001465 channel = threading_utils.TaskChannel()
1466 pending = {}
1467 for digest, dest in options.file:
Marc-Antoine Ruelcf51ea92017-10-24 17:28:43 -07001468 dest = unicode(dest)
Vadim Shtayura3172be52013-12-03 12:49:05 -08001469 pending[digest] = dest
1470 storage.async_fetch(
1471 channel,
Vadim Shtayura3148e072014-09-02 18:51:52 -07001472 threading_utils.PRIORITY_MED,
Vadim Shtayura3172be52013-12-03 12:49:05 -08001473 digest,
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001474 local_caching.UNKNOWN_FILE_SIZE,
1475 functools.partial(
1476 local_caching.file_write, os.path.join(options.target, dest)))
Vadim Shtayura3172be52013-12-03 12:49:05 -08001477 while pending:
Marc-Antoine Ruel4494b6c2018-11-28 21:00:41 +00001478 fetched = channel.next()
Vadim Shtayura3172be52013-12-03 12:49:05 -08001479 dest = pending.pop(fetched)
1480 logging.info('%s: %s', fetched, dest)
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001481
Vadim Shtayura3172be52013-12-03 12:49:05 -08001482 # Fetching whole isolated tree.
1483 if options.isolated:
Marc-Antoine Ruele79ddbf2018-06-13 18:33:07 +00001484 bundle = fetch_isolated(
1485 isolated_hash=options.isolated,
1486 storage=storage,
1487 cache=cache,
1488 outdir=options.target,
1489 use_symlinks=options.use_symlinks)
1490 cache.trim()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001491 if bundle.command:
1492 rel = os.path.join(options.target, bundle.relative_cwd)
1493 print('To run this test please run from the directory %s:' %
1494 os.path.join(options.target, rel))
1495 print(' ' + ' '.join(bundle.command))
vadimsh@chromium.org7b5dae32013-10-03 16:59:59 +00001496
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001497 return 0
1498
1499
Marc-Antoine Ruel1f8ba352014-11-04 15:55:03 -05001500def add_archive_options(parser):
1501 parser.add_option(
1502 '--blacklist',
1503 action='append', default=list(DEFAULT_BLACKLIST),
1504 help='List of regexp to use as blacklist filter when uploading '
1505 'directories')
1506
1507
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001508def add_isolate_server_options(parser):
1509 """Adds --isolate-server and --namespace options to parser."""
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001510 parser.add_option(
1511 '-I', '--isolate-server',
1512 metavar='URL', default=os.environ.get('ISOLATE_SERVER', ''),
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001513 help='URL of the Isolate Server to use. Defaults to the environment '
1514 'variable ISOLATE_SERVER if set. No need to specify https://, this '
1515 'is assumed.')
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001516 parser.add_option(
aludwind7b7b7e2017-06-29 16:38:50 -07001517 '--grpc-proxy', help='gRPC proxy by which to communicate to Isolate')
aludwin81178302016-11-30 17:18:49 -08001518 parser.add_option(
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001519 '--namespace', default='default-gzip',
1520 help='The namespace to use on the Isolate Server, default: %default')
1521
1522
nodir55be77b2016-05-03 09:39:57 -07001523def process_isolate_server_options(
1524 parser, options, set_exception_handler, required):
1525 """Processes the --isolate-server option.
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001526
1527 Returns the identity as determined by the server.
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001528 """
1529 if not options.isolate_server:
nodir55be77b2016-05-03 09:39:57 -07001530 if required:
1531 parser.error('--isolate-server is required.')
1532 return
1533
aludwind7b7b7e2017-06-29 16:38:50 -07001534 if options.grpc_proxy:
1535 isolate_storage.set_grpc_proxy(options.grpc_proxy)
aludwin81178302016-11-30 17:18:49 -08001536 else:
1537 try:
1538 options.isolate_server = net.fix_url(options.isolate_server)
1539 except ValueError as e:
1540 parser.error('--isolate-server %s' % e)
Marc-Antoine Ruele290ada2014-12-10 19:48:49 -05001541 if set_exception_handler:
1542 on_error.report_on_exception_exit(options.isolate_server)
Marc-Antoine Ruelf7d737d2014-12-10 15:36:29 -05001543 try:
1544 return auth.ensure_logged_in(options.isolate_server)
1545 except ValueError as e:
1546 parser.error(str(e))
Marc-Antoine Ruel8806e622014-02-12 14:15:53 -05001547
Marc-Antoine Ruel1687b5e2014-02-06 17:47:53 -05001548
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001549def add_cache_options(parser):
1550 cache_group = optparse.OptionGroup(parser, 'Cache management')
1551 cache_group.add_option(
Marc-Antoine Ruel5aeb3bb2018-06-16 13:11:02 +00001552 '--cache', metavar='DIR', default='cache',
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001553 help='Directory to keep a local cache of the files. Accelerates download '
1554 'by reusing already downloaded files. Default=%default')
1555 cache_group.add_option(
1556 '--max-cache-size',
1557 type='int',
1558 metavar='NNN',
maruel71586102016-01-29 11:44:09 -08001559 default=50*1024*1024*1024,
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001560 help='Trim if the cache gets larger than this value, default=%default')
1561 cache_group.add_option(
1562 '--min-free-space',
1563 type='int',
1564 metavar='NNN',
1565 default=2*1024*1024*1024,
1566 help='Trim if disk free space becomes lower than this value, '
1567 'default=%default')
1568 cache_group.add_option(
1569 '--max-items',
1570 type='int',
1571 metavar='NNN',
1572 default=100000,
1573 help='Trim if more than this number of items are in the cache '
1574 'default=%default')
1575 parser.add_option_group(cache_group)
1576
1577
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001578def process_cache_options(options, trim, **kwargs):
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001579 if options.cache:
Marc-Antoine Ruel34f5f282018-05-16 16:04:31 -04001580 policies = local_caching.CachePolicies(
1581 options.max_cache_size,
1582 options.min_free_space,
1583 options.max_items,
1584 # 3 weeks.
1585 max_age_secs=21*24*60*60)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001586
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001587 # |options.cache| path may not exist until DiskContentAddressedCache()
1588 # instance is created.
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001589 server_ref = isolate_storage.ServerRef(
1590 options.isolate_server, options.namespace)
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001591 return local_caching.DiskContentAddressedCache(
Marc-Antoine Ruel3c979cb2015-03-11 13:43:28 -04001592 unicode(os.path.abspath(options.cache)),
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001593 policies,
Marc-Antoine Ruelb8513132018-11-20 19:48:53 +00001594 server_ref.hash_algo(), # pylint: disable=not-callable
John Abd-El-Maleke3a85012018-05-29 20:10:44 -07001595 trim,
maruele6fc9382017-05-04 09:03:48 -07001596 **kwargs)
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001597 else:
Marc-Antoine Ruel2666d9c2018-05-18 13:52:02 -04001598 return local_caching.MemoryContentAddressedCache()
Marc-Antoine Ruela57d7db2014-10-15 20:31:19 -04001599
1600
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001601class OptionParserIsolateServer(logging_utils.OptionParserWithLogging):
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001602 def __init__(self, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001603 logging_utils.OptionParserWithLogging.__init__(
Marc-Antoine Ruelac54cb42013-11-18 14:05:35 -05001604 self,
1605 version=__version__,
1606 prog=os.path.basename(sys.modules[__name__].__file__),
1607 **kwargs)
Vadim Shtayurae34e13a2014-02-02 11:23:26 -08001608 auth.add_auth_options(self)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001609
1610 def parse_args(self, *args, **kwargs):
Marc-Antoine Ruelf74cffe2015-07-15 15:21:34 -04001611 options, args = logging_utils.OptionParserWithLogging.parse_args(
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001612 self, *args, **kwargs)
Vadim Shtayura5d1efce2014-02-04 10:55:43 -08001613 auth.process_auth_options(self, options)
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001614 return options, args
1615
1616
1617def main(args):
1618 dispatcher = subcommand.CommandDispatcher(__name__)
Marc-Antoine Ruelcfb60852014-07-02 15:22:00 -04001619 return dispatcher.execute(OptionParserIsolateServer(), args)
maruel@chromium.orgc6f90062012-11-07 18:32:22 +00001620
1621
1622if __name__ == '__main__':
maruel8e4e40c2016-05-30 06:21:07 -07001623 subprocess42.inhibit_os_error_reporting()
maruel@chromium.orgfb78d432013-08-28 21:22:40 +00001624 fix_encoding.fix_encoding()
1625 tools.disable_buffering()
1626 colorama.init()
maruel@chromium.orgcb3c3d52013-03-14 18:55:30 +00001627 sys.exit(main(sys.argv[1:]))